aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/multiprocessing
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2024-03-05 10:40:59 +0100
committerGitHub <noreply@github.com>2024-03-05 12:40:59 +0300
commit1ac13c847b5358faba44dbb638a828e24369467b (patch)
tree07672b4dd3604ad3dee540a02c6494cb7d10dc3d /contrib/tools/python3/Lib/multiprocessing
parentffcca3e7f7958ddc6487b91d3df8c01054bd0638 (diff)
downloadydb-1ac13c847b5358faba44dbb638a828e24369467b.tar.gz
Library import 16 (#2433)
Co-authored-by: robot-piglet <robot-piglet@yandex-team.com> Co-authored-by: deshevoy <deshevoy@yandex-team.com> Co-authored-by: robot-contrib <robot-contrib@yandex-team.com> Co-authored-by: thegeorg <thegeorg@yandex-team.com> Co-authored-by: robot-ya-builder <robot-ya-builder@yandex-team.com> Co-authored-by: svidyuk <svidyuk@yandex-team.com> Co-authored-by: shadchin <shadchin@yandex-team.com> Co-authored-by: robot-ratatosk <robot-ratatosk@yandex-team.com> Co-authored-by: innokentii <innokentii@yandex-team.com> Co-authored-by: arkady-e1ppa <arkady-e1ppa@yandex-team.com> Co-authored-by: snermolaev <snermolaev@yandex-team.com> Co-authored-by: dimdim11 <dimdim11@yandex-team.com> Co-authored-by: kickbutt <kickbutt@yandex-team.com> Co-authored-by: abdullinsaid <abdullinsaid@yandex-team.com> Co-authored-by: korsunandrei <korsunandrei@yandex-team.com> Co-authored-by: petrk <petrk@yandex-team.com> Co-authored-by: miroslav2 <miroslav2@yandex-team.com> Co-authored-by: serjflint <serjflint@yandex-team.com> Co-authored-by: akhropov <akhropov@yandex-team.com> Co-authored-by: prettyboy <prettyboy@yandex-team.com> Co-authored-by: ilikepugs <ilikepugs@yandex-team.com> Co-authored-by: hiddenpath <hiddenpath@yandex-team.com> Co-authored-by: mikhnenko <mikhnenko@yandex-team.com> Co-authored-by: spreis <spreis@yandex-team.com> Co-authored-by: andreyshspb <andreyshspb@yandex-team.com> Co-authored-by: dimaandreev <dimaandreev@yandex-team.com> Co-authored-by: rashid <rashid@yandex-team.com> Co-authored-by: robot-ydb-importer <robot-ydb-importer@yandex-team.com> Co-authored-by: r-vetrov <r-vetrov@yandex-team.com> Co-authored-by: ypodlesov <ypodlesov@yandex-team.com> Co-authored-by: zaverden <zaverden@yandex-team.com> Co-authored-by: vpozdyayev <vpozdyayev@yandex-team.com> Co-authored-by: robot-cozmo <robot-cozmo@yandex-team.com> Co-authored-by: v-korovin <v-korovin@yandex-team.com> Co-authored-by: arikon <arikon@yandex-team.com> Co-authored-by: khoden <khoden@yandex-team.com> Co-authored-by: psydmm <psydmm@yandex-team.com> Co-authored-by: robot-javacom <robot-javacom@yandex-team.com> Co-authored-by: dtorilov <dtorilov@yandex-team.com> Co-authored-by: sennikovmv <sennikovmv@yandex-team.com> Co-authored-by: hcpp <hcpp@ydb.tech>
Diffstat (limited to 'contrib/tools/python3/Lib/multiprocessing')
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/__init__.py37
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/connection.py1177
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/context.py377
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/dummy/__init__.py126
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/dummy/connection.py75
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/forkserver.py348
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/heap.py337
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/managers.py1380
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/pool.py957
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/popen_fork.py83
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/popen_forkserver.py74
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/popen_spawn_posix.py72
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/popen_spawn_win32.py146
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/process.py439
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/queues.py401
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/reduction.py281
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/resource_sharer.py154
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/resource_tracker.py269
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/shared_memory.py534
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/sharedctypes.py240
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/spawn.py307
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/synchronize.py404
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/util.py509
23 files changed, 8727 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/multiprocessing/__init__.py b/contrib/tools/python3/Lib/multiprocessing/__init__.py
new file mode 100644
index 0000000000..8336f381de
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/__init__.py
@@ -0,0 +1,37 @@
+#
+# Package analogous to 'threading.py' but using processes
+#
+# multiprocessing/__init__.py
+#
+# This package is intended to duplicate the functionality (and much of
+# the API) of threading.py but uses processes instead of threads. A
+# subpackage 'multiprocessing.dummy' has the same API but is a simple
+# wrapper for 'threading'.
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import sys
+from . import context
+
+#
+# Copy stuff from default context
+#
+
+__all__ = [x for x in dir(context._default_context) if not x.startswith('_')]
+globals().update((name, getattr(context._default_context, name)) for name in __all__)
+
+#
+# XXX These should not really be documented or public.
+#
+
+SUBDEBUG = 5
+SUBWARNING = 25
+
+#
+# Alias for main module -- will be reset by bootstrapping child processes
+#
+
+if '__main__' in sys.modules:
+ sys.modules['__mp_main__'] = sys.modules['__main__']
diff --git a/contrib/tools/python3/Lib/multiprocessing/connection.py b/contrib/tools/python3/Lib/multiprocessing/connection.py
new file mode 100644
index 0000000000..dbbf106f68
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/connection.py
@@ -0,0 +1,1177 @@
+#
+# A higher level module for using sockets (or Windows named pipes)
+#
+# multiprocessing/connection.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+
+import errno
+import io
+import os
+import sys
+import socket
+import struct
+import time
+import tempfile
+import itertools
+
+import _multiprocessing
+
+from . import util
+
+from . import AuthenticationError, BufferTooShort
+from .context import reduction
+_ForkingPickler = reduction.ForkingPickler
+
+try:
+ import _winapi
+ from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ _winapi = None
+
+#
+#
+#
+
+BUFSIZE = 8192
+# A very generous timeout when it comes to local connections...
+CONNECTION_TIMEOUT = 20.
+
+_mmap_counter = itertools.count()
+
+default_family = 'AF_INET'
+families = ['AF_INET']
+
+if hasattr(socket, 'AF_UNIX'):
+ default_family = 'AF_UNIX'
+ families += ['AF_UNIX']
+
+if sys.platform == 'win32':
+ default_family = 'AF_PIPE'
+ families += ['AF_PIPE']
+
+
+def _init_timeout(timeout=CONNECTION_TIMEOUT):
+ return time.monotonic() + timeout
+
+def _check_timeout(t):
+ return time.monotonic() > t
+
+#
+#
+#
+
+def arbitrary_address(family):
+ '''
+ Return an arbitrary free address for the given family
+ '''
+ if family == 'AF_INET':
+ return ('localhost', 0)
+ elif family == 'AF_UNIX':
+ return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
+ elif family == 'AF_PIPE':
+ return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
+ (os.getpid(), next(_mmap_counter)), dir="")
+ else:
+ raise ValueError('unrecognized family')
+
+def _validate_family(family):
+ '''
+ Checks if the family is valid for the current environment.
+ '''
+ if sys.platform != 'win32' and family == 'AF_PIPE':
+ raise ValueError('Family %s is not recognized.' % family)
+
+ if sys.platform == 'win32' and family == 'AF_UNIX':
+ # double check
+ if not hasattr(socket, family):
+ raise ValueError('Family %s is not recognized.' % family)
+
+def address_type(address):
+ '''
+ Return the types of the address
+
+ This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
+ '''
+ if type(address) == tuple:
+ return 'AF_INET'
+ elif type(address) is str and address.startswith('\\\\'):
+ return 'AF_PIPE'
+ elif type(address) is str or util.is_abstract_socket_namespace(address):
+ return 'AF_UNIX'
+ else:
+ raise ValueError('address type of %r unrecognized' % address)
+
+#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise OSError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise OSError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise OSError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise OSError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ if m.itemsize > 1:
+ m = m.cast('B')
+ n = m.nbytes
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ self._send_bytes(_ForkingPickler.dumps(obj))
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable bytes-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return _ForkingPickler.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
+
+if _winapi:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _got_empty_message = False
+ _send_ov = None
+
+ def _close(self, _CloseHandle=_winapi.CloseHandle):
+ ov = self._send_ov
+ if ov is not None:
+ # Interrupt WaitForMultipleObjects() in _send_bytes()
+ ov.cancel()
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ if self._send_ov is not None:
+ # A connection should only be used by a single thread
+ raise ValueError("concurrent send_bytes() calls "
+ "are not supported")
+ ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
+ self._send_ov = ov
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ self._send_ov = None
+ nwritten, err = ov.GetOverlappedResult(True)
+ if err == _winapi.ERROR_OPERATION_ABORTED:
+ # close() was called by another thread while
+ # WaitForMultipleObjects() was waiting for the overlapped
+ # operation.
+ raise OSError(errno.EPIPE, "handle is closed")
+ assert err == 0
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ if self._got_empty_message:
+ self._got_empty_message = False
+ return io.BytesIO()
+ else:
+ bsize = 128 if maxsize is None else min(maxsize, 128)
+ try:
+ ov, err = _winapi.ReadFile(self._handle, bsize,
+ overlapped=True)
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nread, err = ov.GetOverlappedResult(True)
+ if err == 0:
+ f = io.BytesIO()
+ f.write(ov.getbuffer())
+ return f
+ elif err == _winapi.ERROR_MORE_DATA:
+ return self._get_more_data(ov, maxsize)
+ except OSError as e:
+ if e.winerror == _winapi.ERROR_BROKEN_PIPE:
+ raise EOFError
+ else:
+ raise
+ raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
+
+ def _poll(self, timeout):
+ if (self._got_empty_message or
+ _winapi.PeekNamedPipe(self._handle)[0] != 0):
+ return True
+ return bool(wait([self], timeout))
+
+ def _get_more_data(self, ov, maxsize):
+ buf = ov.getbuffer()
+ f = io.BytesIO()
+ f.write(buf)
+ left = _winapi.PeekNamedPipe(self._handle)[1]
+ assert left > 0
+ if maxsize is not None and len(buf) + left > maxsize:
+ self._bad_message_length()
+ ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
+ rbytes, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert rbytes == left
+ f.write(ov.getbuffer())
+ return f
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if _winapi:
+ def _close(self, _close=_multiprocessing.closesocket):
+ _close(self._handle)
+ _write = _multiprocessing.send
+ _read = _multiprocessing.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ remaining = size
+ while remaining > 0:
+ chunk = read(handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise OSError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ n = len(buf)
+ if n > 0x7fffffff:
+ pre_header = struct.pack("!i", -1)
+ header = struct.pack("!Q", n)
+ self._send(pre_header)
+ self._send(header)
+ self._send(buf)
+ else:
+ # For wire compatibility with 3.7 and lower
+ header = struct.pack("!i", n)
+ if n > 16384:
+ # The payload is large so Nagle's algorithm won't be triggered
+ # and we'd better avoid the cost of concatenation.
+ self._send(header)
+ self._send(buf)
+ else:
+ # Issue #20540: concatenate before sending, to avoid delays due
+ # to Nagle's algorithm on a TCP socket.
+ # Also note we want to avoid sending a 0-length buffer separately,
+ # to avoid "broken pipe" errors if the other end closed the pipe.
+ self._send(header + buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("!i", buf.getvalue())
+ if size == -1:
+ buf = self._recv(8)
+ size, = struct.unpack("!Q", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ r = wait([self], timeout)
+ return bool(r)
+
+
+#
+# Public functions
+#
+
+class Listener(object):
+ '''
+ Returns a listener object.
+
+ This is a wrapper for a bound socket which is 'listening' for
+ connections, or for a Windows named pipe.
+ '''
+ def __init__(self, address=None, family=None, backlog=1, authkey=None):
+ family = family or (address and address_type(address)) \
+ or default_family
+ address = address or arbitrary_address(family)
+
+ _validate_family(family)
+ if family == 'AF_PIPE':
+ self._listener = PipeListener(address, backlog)
+ else:
+ self._listener = SocketListener(address, family, backlog)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError('authkey should be a byte string')
+
+ self._authkey = authkey
+
+ def accept(self):
+ '''
+ Accept a connection on the bound socket or named pipe of `self`.
+
+ Returns a `Connection` object.
+ '''
+ if self._listener is None:
+ raise OSError('listener is closed')
+ c = self._listener.accept()
+ if self._authkey:
+ deliver_challenge(c, self._authkey)
+ answer_challenge(c, self._authkey)
+ return c
+
+ def close(self):
+ '''
+ Close the bound socket or named pipe of `self`.
+ '''
+ listener = self._listener
+ if listener is not None:
+ self._listener = None
+ listener.close()
+
+ @property
+ def address(self):
+ return self._listener._address
+
+ @property
+ def last_accepted(self):
+ return self._listener._last_accepted
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
+
+def Client(address, family=None, authkey=None):
+ '''
+ Returns a connection to the address of a `Listener`
+ '''
+ family = family or address_type(address)
+ _validate_family(family)
+ if family == 'AF_PIPE':
+ c = PipeClient(address)
+ else:
+ c = SocketClient(address)
+
+ if authkey is not None and not isinstance(authkey, bytes):
+ raise TypeError('authkey should be a byte string')
+
+ if authkey is not None:
+ answer_challenge(c, authkey)
+ deliver_challenge(c, authkey)
+
+ return c
+
+
+if sys.platform != 'win32':
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ if duplex:
+ s1, s2 = socket.socketpair()
+ s1.setblocking(True)
+ s2.setblocking(True)
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
+ else:
+ fd1, fd2 = os.pipe()
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
+
+ return c1, c2
+
+else:
+
+ def Pipe(duplex=True):
+ '''
+ Returns pair of connection objects at either end of a pipe
+ '''
+ address = arbitrary_address('AF_PIPE')
+ if duplex:
+ openmode = _winapi.PIPE_ACCESS_DUPLEX
+ access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
+ obsize, ibsize = BUFSIZE, BUFSIZE
+ else:
+ openmode = _winapi.PIPE_ACCESS_INBOUND
+ access = _winapi.GENERIC_WRITE
+ obsize, ibsize = 0, BUFSIZE
+
+ h1 = _winapi.CreateNamedPipe(
+ address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
+ _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
+ # default security descriptor: the handle cannot be inherited
+ _winapi.NULL
+ )
+ h2 = _winapi.CreateFile(
+ address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
+ )
+ _winapi.SetNamedPipeHandleState(
+ h2, _winapi.PIPE_READMODE_MESSAGE, None, None
+ )
+
+ overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
+ _, err = overlapped.GetOverlappedResult(True)
+ assert err == 0
+
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
+
+ return c1, c2
+
+#
+# Definitions for connections based on sockets
+#
+
+class SocketListener(object):
+ '''
+ Representation of a socket which is bound to an address and listening
+ '''
+ def __init__(self, address, family, backlog=1):
+ self._socket = socket.socket(getattr(socket, family))
+ try:
+ # SO_REUSEADDR has different semantics on Windows (issue #2550).
+ if os.name == 'posix':
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ self._socket.setblocking(True)
+ self._socket.bind(address)
+ self._socket.listen(backlog)
+ self._address = self._socket.getsockname()
+ except OSError:
+ self._socket.close()
+ raise
+ self._family = family
+ self._last_accepted = None
+
+ if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
+ # Linux abstract socket namespaces do not need to be explicitly unlinked
+ self._unlink = util.Finalize(
+ self, os.unlink, args=(address,), exitpriority=0
+ )
+ else:
+ self._unlink = None
+
+ def accept(self):
+ s, self._last_accepted = self._socket.accept()
+ s.setblocking(True)
+ return Connection(s.detach())
+
+ def close(self):
+ try:
+ self._socket.close()
+ finally:
+ unlink = self._unlink
+ if unlink is not None:
+ self._unlink = None
+ unlink()
+
+
+def SocketClient(address):
+ '''
+ Return a connection object connected to the socket given by `address`
+ '''
+ family = address_type(address)
+ with socket.socket( getattr(socket, family) ) as s:
+ s.setblocking(True)
+ s.connect(address)
+ return Connection(s.detach())
+
+#
+# Definitions for connections based on named pipes
+#
+
+if sys.platform == 'win32':
+
+ class PipeListener(object):
+ '''
+ Representation of a named pipe
+ '''
+ def __init__(self, address, backlog=None):
+ self._address = address
+ self._handle_queue = [self._new_handle(first=True)]
+
+ self._last_accepted = None
+ util.sub_debug('listener created with address=%r', self._address)
+ self.close = util.Finalize(
+ self, PipeListener._finalize_pipe_listener,
+ args=(self._handle_queue, self._address), exitpriority=0
+ )
+
+ def _new_handle(self, first=False):
+ flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+ return _winapi.CreateNamedPipe(
+ self._address, flags,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
+ _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
+ )
+
+ def accept(self):
+ self._handle_queue.append(self._new_handle())
+ handle = self._handle_queue.pop(0)
+ try:
+ ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
+ except OSError as e:
+ if e.winerror != _winapi.ERROR_NO_DATA:
+ raise
+ # ERROR_NO_DATA can occur if a client has already connected,
+ # written data and then disconnected -- see Issue 14725.
+ else:
+ try:
+ res = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ except:
+ ov.cancel()
+ _winapi.CloseHandle(handle)
+ raise
+ finally:
+ _, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ return PipeConnection(handle)
+
+ @staticmethod
+ def _finalize_pipe_listener(queue, address):
+ util.sub_debug('closing listener with address=%r', address)
+ for handle in queue:
+ _winapi.CloseHandle(handle)
+
+ def PipeClient(address):
+ '''
+ Return a connection object connected to the pipe given by `address`
+ '''
+ t = _init_timeout()
+ while 1:
+ try:
+ _winapi.WaitNamedPipe(address, 1000)
+ h = _winapi.CreateFile(
+ address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
+ 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
+ )
+ except OSError as e:
+ if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
+ _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
+ raise
+ else:
+ break
+ else:
+ raise
+
+ _winapi.SetNamedPipeHandleState(
+ h, _winapi.PIPE_READMODE_MESSAGE, None, None
+ )
+ return PipeConnection(h)
+
+#
+# Authentication stuff
+#
+
+MESSAGE_LENGTH = 40 # MUST be > 20
+
+_CHALLENGE = b'#CHALLENGE#'
+_WELCOME = b'#WELCOME#'
+_FAILURE = b'#FAILURE#'
+
+# multiprocessing.connection Authentication Handshake Protocol Description
+# (as documented for reference after reading the existing code)
+# =============================================================================
+#
+# On Windows: native pipes with "overlapped IO" are used to send the bytes,
+# instead of the length prefix SIZE scheme described below. (ie: the OS deals
+# with message sizes for us)
+#
+# Protocol error behaviors:
+#
+# On POSIX, any failure to receive the length prefix into SIZE, for SIZE greater
+# than the requested maxsize to receive, or receiving fewer than SIZE bytes
+# results in the connection being closed and auth to fail.
+#
+# On Windows, receiving too few bytes is never a low level _recv_bytes read
+# error, receiving too many will trigger an error only if receive maxsize
+# value was larger than 128 OR the if the data arrived in smaller pieces.
+#
+# Serving side Client side
+# ------------------------------ ---------------------------------------
+# 0. Open a connection on the pipe.
+# 1. Accept connection.
+# 2. Random 20+ bytes -> MESSAGE
+# Modern servers always send
+# more than 20 bytes and include
+# a {digest} prefix on it with
+# their preferred HMAC digest.
+# Legacy ones send ==20 bytes.
+# 3. send 4 byte length (net order)
+# prefix followed by:
+# b'#CHALLENGE#' + MESSAGE
+# 4. Receive 4 bytes, parse as network byte
+# order integer. If it is -1, receive an
+# additional 8 bytes, parse that as network
+# byte order. The result is the length of
+# the data that follows -> SIZE.
+# 5. Receive min(SIZE, 256) bytes -> M1
+# 6. Assert that M1 starts with:
+# b'#CHALLENGE#'
+# 7. Strip that prefix from M1 into -> M2
+# 7.1. Parse M2: if it is exactly 20 bytes in
+# length this indicates a legacy server
+# supporting only HMAC-MD5. Otherwise the
+# 7.2. preferred digest is looked up from an
+# expected "{digest}" prefix on M2. No prefix
+# or unsupported digest? <- AuthenticationError
+# 7.3. Put divined algorithm name in -> D_NAME
+# 8. Compute HMAC-D_NAME of AUTHKEY, M2 -> C_DIGEST
+# 9. Send 4 byte length prefix (net order)
+# followed by C_DIGEST bytes.
+# 10. Receive 4 or 4+8 byte length
+# prefix (#4 dance) -> SIZE.
+# 11. Receive min(SIZE, 256) -> C_D.
+# 11.1. Parse C_D: legacy servers
+# accept it as is, "md5" -> D_NAME
+# 11.2. modern servers check the length
+# of C_D, IF it is 16 bytes?
+# 11.2.1. "md5" -> D_NAME
+# and skip to step 12.
+# 11.3. longer? expect and parse a "{digest}"
+# prefix into -> D_NAME.
+# Strip the prefix and store remaining
+# bytes in -> C_D.
+# 11.4. Don't like D_NAME? <- AuthenticationError
+# 12. Compute HMAC-D_NAME of AUTHKEY,
+# MESSAGE into -> M_DIGEST.
+# 13. Compare M_DIGEST == C_D:
+# 14a: Match? Send length prefix &
+# b'#WELCOME#'
+# <- RETURN
+# 14b: Mismatch? Send len prefix &
+# b'#FAILURE#'
+# <- CLOSE & AuthenticationError
+# 15. Receive 4 or 4+8 byte length prefix (net
+# order) again as in #4 into -> SIZE.
+# 16. Receive min(SIZE, 256) bytes -> M3.
+# 17. Compare M3 == b'#WELCOME#':
+# 17a. Match? <- RETURN
+# 17b. Mismatch? <- CLOSE & AuthenticationError
+#
+# If this RETURNed, the connection remains open: it has been authenticated.
+#
+# Length prefixes are used consistently. Even on the legacy protocol, this
+# was good fortune and allowed us to evolve the protocol by using the length
+# of the opening challenge or length of the returned digest as a signal as
+# to which protocol the other end supports.
+
+_ALLOWED_DIGESTS = frozenset(
+ {b'md5', b'sha256', b'sha384', b'sha3_256', b'sha3_384'})
+_MAX_DIGEST_LEN = max(len(_) for _ in _ALLOWED_DIGESTS)
+
+# Old hmac-md5 only server versions from Python <=3.11 sent a message of this
+# length. It happens to not match the length of any supported digest so we can
+# use a message of this length to indicate that we should work in backwards
+# compatible md5-only mode without a {digest_name} prefix on our response.
+_MD5ONLY_MESSAGE_LENGTH = 20
+_MD5_DIGEST_LEN = 16
+_LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)
+
+
+def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
+ """Returns a digest name and the payload for a response hash.
+
+ If a legacy protocol is detected based on the message length
+ or contents the digest name returned will be empty to indicate
+ legacy mode where MD5 and no digest prefix should be sent.
+ """
+ # modern message format: b"{digest}payload" longer than 20 bytes
+ # legacy message format: 16 or 20 byte b"payload"
+ if len(message) in _LEGACY_LENGTHS:
+ # Either this was a legacy server challenge, or we're processing
+ # a reply from a legacy client that sent an unprefixed 16-byte
+ # HMAC-MD5 response. All messages using the modern protocol will
+ # be longer than either of these lengths.
+ return '', message
+ if (message.startswith(b'{') and
+ (curly := message.find(b'}', 1, _MAX_DIGEST_LEN+2)) > 0):
+ digest = message[1:curly]
+ if digest in _ALLOWED_DIGESTS:
+ payload = message[curly+1:]
+ return digest.decode('ascii'), payload
+ raise AuthenticationError(
+ 'unsupported message length, missing digest prefix, '
+ f'or unsupported digest: {message=}')
+
+
+def _create_response(authkey, message):
+ """Create a MAC based on authkey and message
+
+ The MAC algorithm defaults to HMAC-MD5, unless MD5 is not available or
+ the message has a '{digest_name}' prefix. For legacy HMAC-MD5, the response
+ is the raw MAC, otherwise the response is prefixed with '{digest_name}',
+ e.g. b'{sha256}abcdefg...'
+
+ Note: The MAC protects the entire message including the digest_name prefix.
+ """
+ import hmac
+ digest_name = _get_digest_name_and_payload(message)[0]
+ # The MAC protects the entire message: digest header and payload.
+ if not digest_name:
+ # Legacy server without a {digest} prefix on message.
+ # Generate a legacy non-prefixed HMAC-MD5 reply.
+ try:
+ return hmac.new(authkey, message, 'md5').digest()
+ except ValueError:
+ # HMAC-MD5 is not available (FIPS mode?), fall back to
+ # HMAC-SHA2-256 modern protocol. The legacy server probably
+ # doesn't support it and will reject us anyways. :shrug:
+ digest_name = 'sha256'
+ # Modern protocol, indicate the digest used in the reply.
+ response = hmac.new(authkey, message, digest_name).digest()
+ return b'{%s}%s' % (digest_name.encode('ascii'), response)
+
+
+def _verify_challenge(authkey, message, response):
+ """Verify MAC challenge
+
+ If our message did not include a digest_name prefix, the client is allowed
+ to select a stronger digest_name from _ALLOWED_DIGESTS.
+
+ In case our message is prefixed, a client cannot downgrade to a weaker
+ algorithm, because the MAC is calculated over the entire message
+ including the '{digest_name}' prefix.
+ """
+ import hmac
+ response_digest, response_mac = _get_digest_name_and_payload(response)
+ response_digest = response_digest or 'md5'
+ try:
+ expected = hmac.new(authkey, message, response_digest).digest()
+ except ValueError:
+ raise AuthenticationError(f'{response_digest=} unsupported')
+ if len(expected) != len(response_mac):
+ raise AuthenticationError(
+ f'expected {response_digest!r} of length {len(expected)} '
+ f'got {len(response_mac)}')
+ if not hmac.compare_digest(expected, response_mac):
+ raise AuthenticationError('digest received was wrong')
+
+
+def deliver_challenge(connection, authkey: bytes, digest_name='sha256'):
+ if not isinstance(authkey, bytes):
+ raise ValueError(
+ "Authkey must be bytes, not {0!s}".format(type(authkey)))
+ assert MESSAGE_LENGTH > _MD5ONLY_MESSAGE_LENGTH, "protocol constraint"
+ message = os.urandom(MESSAGE_LENGTH)
+ message = b'{%s}%s' % (digest_name.encode('ascii'), message)
+ # Even when sending a challenge to a legacy client that does not support
+ # digest prefixes, they'll take the entire thing as a challenge and
+ # respond to it with a raw HMAC-MD5.
+ connection.send_bytes(_CHALLENGE + message)
+ response = connection.recv_bytes(256) # reject large message
+ try:
+ _verify_challenge(authkey, message, response)
+ except AuthenticationError:
+ connection.send_bytes(_FAILURE)
+ raise
+ else:
+ connection.send_bytes(_WELCOME)
+
+
+def answer_challenge(connection, authkey: bytes):
+ if not isinstance(authkey, bytes):
+ raise ValueError(
+ "Authkey must be bytes, not {0!s}".format(type(authkey)))
+ message = connection.recv_bytes(256) # reject large message
+ if not message.startswith(_CHALLENGE):
+ raise AuthenticationError(
+ f'Protocol error, expected challenge: {message=}')
+ message = message[len(_CHALLENGE):]
+ if len(message) < _MD5ONLY_MESSAGE_LENGTH:
+ raise AuthenticationError('challenge too short: {len(message)} bytes')
+ digest = _create_response(authkey, message)
+ connection.send_bytes(digest)
+ response = connection.recv_bytes(256) # reject large message
+ if response != _WELCOME:
+ raise AuthenticationError('digest sent was rejected')
+
+#
+# Support for using xmlrpclib for serialization
+#
+
+class ConnectionWrapper(object):
+ def __init__(self, conn, dumps, loads):
+ self._conn = conn
+ self._dumps = dumps
+ self._loads = loads
+ for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
+ obj = getattr(conn, attr)
+ setattr(self, attr, obj)
+ def send(self, obj):
+ s = self._dumps(obj)
+ self._conn.send_bytes(s)
+ def recv(self):
+ s = self._conn.recv_bytes()
+ return self._loads(s)
+
+def _xml_dumps(obj):
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
+
+def _xml_loads(s):
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
+ return obj
+
+class XmlListener(Listener):
+ def accept(self):
+ global xmlrpclib
+ import xmlrpc.client as xmlrpclib
+ obj = Listener.accept(self)
+ return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
+
+def XmlClient(*args, **kwds):
+ global xmlrpclib
+ import xmlrpc.client as xmlrpclib
+ return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+#
+# Wait
+#
+
+if sys.platform == 'win32':
+
+ def _exhaustive_wait(handles, timeout):
+ # Return ALL handles which are currently signalled. (Only
+ # returning the first signalled might create starvation issues.)
+ L = list(handles)
+ ready = []
+ while L:
+ res = _winapi.WaitForMultipleObjects(L, False, timeout)
+ if res == WAIT_TIMEOUT:
+ break
+ elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
+ res -= WAIT_OBJECT_0
+ elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
+ res -= WAIT_ABANDONED_0
+ else:
+ raise RuntimeError('Should not get here')
+ ready.append(L[res])
+ L = L[res+1:]
+ timeout = 0
+ return ready
+
+ _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is None:
+ timeout = INFINITE
+ elif timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+
+ object_list = list(object_list)
+ waithandle_to_obj = {}
+ ov_list = []
+ ready_objects = set()
+ ready_handles = set()
+
+ try:
+ for o in object_list:
+ try:
+ fileno = getattr(o, 'fileno')
+ except AttributeError:
+ waithandle_to_obj[o.__index__()] = o
+ else:
+ # start an overlapped read of length zero
+ try:
+ ov, err = _winapi.ReadFile(fileno(), 0, True)
+ except OSError as e:
+ ov, err = None, e.winerror
+ if err not in _ready_errors:
+ raise
+ if err == _winapi.ERROR_IO_PENDING:
+ ov_list.append(ov)
+ waithandle_to_obj[ov.event] = o
+ else:
+ # If o.fileno() is an overlapped pipe handle and
+ # err == 0 then there is a zero length message
+ # in the pipe, but it HAS NOT been consumed...
+ if ov and sys.getwindowsversion()[:2] >= (6, 2):
+ # ... except on Windows 8 and later, where
+ # the message HAS been consumed.
+ try:
+ _, err = ov.GetOverlappedResult(False)
+ except OSError as e:
+ err = e.winerror
+ if not err and hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+ ready_objects.add(o)
+ timeout = 0
+
+ ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
+ finally:
+ # request that overlapped reads stop
+ for ov in ov_list:
+ ov.cancel()
+
+ # wait for all overlapped reads to stop
+ for ov in ov_list:
+ try:
+ _, err = ov.GetOverlappedResult(True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err != _winapi.ERROR_OPERATION_ABORTED:
+ o = waithandle_to_obj[ov.event]
+ ready_objects.add(o)
+ if err == 0:
+ # If o.fileno() is an overlapped pipe handle then
+ # a zero length message HAS been consumed.
+ if hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+
+ ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
+ return [o for o in object_list if o in ready_objects]
+
+else:
+
+ import selectors
+
+ # poll/select have the advantage of not requiring any extra file
+ # descriptor, contrarily to epoll/kqueue (also, they require a single
+ # syscall).
+ if hasattr(selectors, 'PollSelector'):
+ _WaitSelector = selectors.PollSelector
+ else:
+ _WaitSelector = selectors.SelectSelector
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ with _WaitSelector() as selector:
+ for obj in object_list:
+ selector.register(obj, selectors.EVENT_READ)
+
+ if timeout is not None:
+ deadline = time.monotonic() + timeout
+
+ while True:
+ ready = selector.select(timeout)
+ if ready:
+ return [key.fileobj for (key, events) in ready]
+ else:
+ if timeout is not None:
+ timeout = deadline - time.monotonic()
+ if timeout < 0:
+ return ready
+
+#
+# Make connection and socket objects shareable if possible
+#
+
+if sys.platform == 'win32':
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ from . import resource_sharer
+ ds = resource_sharer.DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+ def rebuild_connection(ds, readable, writable):
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
+ reduction.register(Connection, reduce_connection)
+
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = reduction.DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+ def rebuild_pipe_connection(dh, readable, writable):
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
+ reduction.register(PipeConnection, reduce_pipe_connection)
+
+else:
+ def reduce_connection(conn):
+ df = reduction.DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
+ def rebuild_connection(df, readable, writable):
+ fd = df.detach()
+ return Connection(fd, readable, writable)
+ reduction.register(Connection, reduce_connection)
diff --git a/contrib/tools/python3/Lib/multiprocessing/context.py b/contrib/tools/python3/Lib/multiprocessing/context.py
new file mode 100644
index 0000000000..de8a264829
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/context.py
@@ -0,0 +1,377 @@
+import os
+import sys
+import threading
+
+from . import process
+from . import reduction
+
+__all__ = ()
+
+#
+# Exceptions
+#
+
+class ProcessError(Exception):
+ pass
+
+class BufferTooShort(ProcessError):
+ pass
+
+class TimeoutError(ProcessError):
+ pass
+
+class AuthenticationError(ProcessError):
+ pass
+
+#
+# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
+#
+
+class BaseContext(object):
+
+ ProcessError = ProcessError
+ BufferTooShort = BufferTooShort
+ TimeoutError = TimeoutError
+ AuthenticationError = AuthenticationError
+
+ current_process = staticmethod(process.current_process)
+ parent_process = staticmethod(process.parent_process)
+ active_children = staticmethod(process.active_children)
+
+ def cpu_count(self):
+ '''Returns the number of CPUs in the system'''
+ num = os.cpu_count()
+ if num is None:
+ raise NotImplementedError('cannot determine number of cpus')
+ else:
+ return num
+
+ def Manager(self):
+ '''Returns a manager associated with a running server process
+
+ The managers methods such as `Lock()`, `Condition()` and `Queue()`
+ can be used to create shared objects.
+ '''
+ from .managers import SyncManager
+ m = SyncManager(ctx=self.get_context())
+ m.start()
+ return m
+
+ def Pipe(self, duplex=True):
+ '''Returns two connection object connected by a pipe'''
+ from .connection import Pipe
+ return Pipe(duplex)
+
+ def Lock(self):
+ '''Returns a non-recursive lock object'''
+ from .synchronize import Lock
+ return Lock(ctx=self.get_context())
+
+ def RLock(self):
+ '''Returns a recursive lock object'''
+ from .synchronize import RLock
+ return RLock(ctx=self.get_context())
+
+ def Condition(self, lock=None):
+ '''Returns a condition object'''
+ from .synchronize import Condition
+ return Condition(lock, ctx=self.get_context())
+
+ def Semaphore(self, value=1):
+ '''Returns a semaphore object'''
+ from .synchronize import Semaphore
+ return Semaphore(value, ctx=self.get_context())
+
+ def BoundedSemaphore(self, value=1):
+ '''Returns a bounded semaphore object'''
+ from .synchronize import BoundedSemaphore
+ return BoundedSemaphore(value, ctx=self.get_context())
+
+ def Event(self):
+ '''Returns an event object'''
+ from .synchronize import Event
+ return Event(ctx=self.get_context())
+
+ def Barrier(self, parties, action=None, timeout=None):
+ '''Returns a barrier object'''
+ from .synchronize import Barrier
+ return Barrier(parties, action, timeout, ctx=self.get_context())
+
+ def Queue(self, maxsize=0):
+ '''Returns a queue object'''
+ from .queues import Queue
+ return Queue(maxsize, ctx=self.get_context())
+
+ def JoinableQueue(self, maxsize=0):
+ '''Returns a queue object'''
+ from .queues import JoinableQueue
+ return JoinableQueue(maxsize, ctx=self.get_context())
+
+ def SimpleQueue(self):
+ '''Returns a queue object'''
+ from .queues import SimpleQueue
+ return SimpleQueue(ctx=self.get_context())
+
+ def Pool(self, processes=None, initializer=None, initargs=(),
+ maxtasksperchild=None):
+ '''Returns a process pool object'''
+ from .pool import Pool
+ return Pool(processes, initializer, initargs, maxtasksperchild,
+ context=self.get_context())
+
+ def RawValue(self, typecode_or_type, *args):
+ '''Returns a shared object'''
+ from .sharedctypes import RawValue
+ return RawValue(typecode_or_type, *args)
+
+ def RawArray(self, typecode_or_type, size_or_initializer):
+ '''Returns a shared array'''
+ from .sharedctypes import RawArray
+ return RawArray(typecode_or_type, size_or_initializer)
+
+ def Value(self, typecode_or_type, *args, lock=True):
+ '''Returns a synchronized shared object'''
+ from .sharedctypes import Value
+ return Value(typecode_or_type, *args, lock=lock,
+ ctx=self.get_context())
+
+ def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
+ '''Returns a synchronized shared array'''
+ from .sharedctypes import Array
+ return Array(typecode_or_type, size_or_initializer, lock=lock,
+ ctx=self.get_context())
+
+ def freeze_support(self):
+ '''Check whether this is a fake forked process in a frozen executable.
+ If so then run code specified by commandline and exit.
+ '''
+ if sys.platform == 'win32' and getattr(sys, 'frozen', False):
+ from .spawn import freeze_support
+ freeze_support()
+
+ def get_logger(self):
+ '''Return package logger -- if it does not already exist then
+ it is created.
+ '''
+ from .util import get_logger
+ return get_logger()
+
+ def log_to_stderr(self, level=None):
+ '''Turn on logging and add a handler which prints to stderr'''
+ from .util import log_to_stderr
+ return log_to_stderr(level)
+
+ def allow_connection_pickling(self):
+ '''Install support for sending connections and sockets
+ between processes
+ '''
+ # This is undocumented. In previous versions of multiprocessing
+ # its only effect was to make socket objects inheritable on Windows.
+ from . import connection
+
+ def set_executable(self, executable):
+ '''Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes instead of sys.executable when using the 'spawn'
+ start method. Useful for people embedding Python.
+ '''
+ from .spawn import set_executable
+ set_executable(executable)
+
+ def set_forkserver_preload(self, module_names):
+ '''Set list of module names to try to load in forkserver process.
+ This is really just a hint.
+ '''
+ from .forkserver import set_forkserver_preload
+ set_forkserver_preload(module_names)
+
+ def get_context(self, method=None):
+ if method is None:
+ return self
+ try:
+ ctx = _concrete_contexts[method]
+ except KeyError:
+ raise ValueError('cannot find context for %r' % method) from None
+ ctx._check_available()
+ return ctx
+
+ def get_start_method(self, allow_none=False):
+ return self._name
+
+ def set_start_method(self, method, force=False):
+ raise ValueError('cannot set start method of concrete context')
+
+ @property
+ def reducer(self):
+ '''Controls how objects will be reduced to a form that can be
+ shared with other processes.'''
+ return globals().get('reduction')
+
+ @reducer.setter
+ def reducer(self, reduction):
+ globals()['reduction'] = reduction
+
+ def _check_available(self):
+ pass
+
+#
+# Type of default context -- underlying context can be set at most once
+#
+
+class Process(process.BaseProcess):
+ _start_method = None
+ @staticmethod
+ def _Popen(process_obj):
+ return _default_context.get_context().Process._Popen(process_obj)
+
+ @staticmethod
+ def _after_fork():
+ return _default_context.get_context().Process._after_fork()
+
+class DefaultContext(BaseContext):
+ Process = Process
+
+ def __init__(self, context):
+ self._default_context = context
+ self._actual_context = None
+
+ def get_context(self, method=None):
+ if method is None:
+ if self._actual_context is None:
+ self._actual_context = self._default_context
+ return self._actual_context
+ else:
+ return super().get_context(method)
+
+ def set_start_method(self, method, force=False):
+ if self._actual_context is not None and not force:
+ raise RuntimeError('context has already been set')
+ if method is None and force:
+ self._actual_context = None
+ return
+ self._actual_context = self.get_context(method)
+
+ def get_start_method(self, allow_none=False):
+ if self._actual_context is None:
+ if allow_none:
+ return None
+ self._actual_context = self._default_context
+ return self._actual_context._name
+
+ def get_all_start_methods(self):
+ """Returns a list of the supported start methods, default first."""
+ if sys.platform == 'win32':
+ return ['spawn']
+ else:
+ methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
+ if reduction.HAVE_SEND_HANDLE:
+ methods.append('forkserver')
+ return methods
+
+
+#
+# Context types for fixed start method
+#
+
+if sys.platform != 'win32':
+
+ class ForkProcess(process.BaseProcess):
+ _start_method = 'fork'
+ @staticmethod
+ def _Popen(process_obj):
+ from .popen_fork import Popen
+ return Popen(process_obj)
+
+ class SpawnProcess(process.BaseProcess):
+ _start_method = 'spawn'
+ @staticmethod
+ def _Popen(process_obj):
+ from .popen_spawn_posix import Popen
+ return Popen(process_obj)
+
+ @staticmethod
+ def _after_fork():
+ # process is spawned, nothing to do
+ pass
+
+ class ForkServerProcess(process.BaseProcess):
+ _start_method = 'forkserver'
+ @staticmethod
+ def _Popen(process_obj):
+ from .popen_forkserver import Popen
+ return Popen(process_obj)
+
+ class ForkContext(BaseContext):
+ _name = 'fork'
+ Process = ForkProcess
+
+ class SpawnContext(BaseContext):
+ _name = 'spawn'
+ Process = SpawnProcess
+
+ class ForkServerContext(BaseContext):
+ _name = 'forkserver'
+ Process = ForkServerProcess
+ def _check_available(self):
+ if not reduction.HAVE_SEND_HANDLE:
+ raise ValueError('forkserver start method not available')
+
+ _concrete_contexts = {
+ 'fork': ForkContext(),
+ 'spawn': SpawnContext(),
+ 'forkserver': ForkServerContext(),
+ }
+ if sys.platform == 'darwin':
+ # bpo-33725: running arbitrary code after fork() is no longer reliable
+ # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
+ _default_context = DefaultContext(_concrete_contexts['spawn'])
+ else:
+ _default_context = DefaultContext(_concrete_contexts['fork'])
+
+else:
+
+ class SpawnProcess(process.BaseProcess):
+ _start_method = 'spawn'
+ @staticmethod
+ def _Popen(process_obj):
+ from .popen_spawn_win32 import Popen
+ return Popen(process_obj)
+
+ @staticmethod
+ def _after_fork():
+ # process is spawned, nothing to do
+ pass
+
+ class SpawnContext(BaseContext):
+ _name = 'spawn'
+ Process = SpawnProcess
+
+ _concrete_contexts = {
+ 'spawn': SpawnContext(),
+ }
+ _default_context = DefaultContext(_concrete_contexts['spawn'])
+
+#
+# Force the start method
+#
+
+def _force_start_method(method):
+ _default_context._actual_context = _concrete_contexts[method]
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+ return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+ _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+ if get_spawning_popen() is None:
+ raise RuntimeError(
+ '%s objects should only be shared between processes'
+ ' through inheritance' % type(obj).__name__
+ )
diff --git a/contrib/tools/python3/Lib/multiprocessing/dummy/__init__.py b/contrib/tools/python3/Lib/multiprocessing/dummy/__init__.py
new file mode 100644
index 0000000000..6a1468609e
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/dummy/__init__.py
@@ -0,0 +1,126 @@
+#
+# Support for the API of the multiprocessing package using threads
+#
+# multiprocessing/dummy/__init__.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = [
+ 'Process', 'current_process', 'active_children', 'freeze_support',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
+ 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
+ ]
+
+#
+# Imports
+#
+
+import threading
+import sys
+import weakref
+import array
+
+from .connection import Pipe
+from threading import Lock, RLock, Semaphore, BoundedSemaphore
+from threading import Event, Condition, Barrier
+from queue import Queue
+
+#
+#
+#
+
+class DummyProcess(threading.Thread):
+
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
+ threading.Thread.__init__(self, group, target, name, args, kwargs)
+ self._pid = None
+ self._children = weakref.WeakKeyDictionary()
+ self._start_called = False
+ self._parent = current_process()
+
+ def start(self):
+ if self._parent is not current_process():
+ raise RuntimeError(
+ "Parent is {0!r} but current_process is {1!r}".format(
+ self._parent, current_process()))
+ self._start_called = True
+ if hasattr(self._parent, '_children'):
+ self._parent._children[self] = None
+ threading.Thread.start(self)
+
+ @property
+ def exitcode(self):
+ if self._start_called and not self.is_alive():
+ return 0
+ else:
+ return None
+
+#
+#
+#
+
+Process = DummyProcess
+current_process = threading.current_thread
+current_process()._children = weakref.WeakKeyDictionary()
+
+def active_children():
+ children = current_process()._children
+ for p in list(children):
+ if not p.is_alive():
+ children.pop(p, None)
+ return list(children)
+
+def freeze_support():
+ pass
+
+#
+#
+#
+
+class Namespace(object):
+ def __init__(self, /, **kwds):
+ self.__dict__.update(kwds)
+ def __repr__(self):
+ items = list(self.__dict__.items())
+ temp = []
+ for name, value in items:
+ if not name.startswith('_'):
+ temp.append('%s=%r' % (name, value))
+ temp.sort()
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
+
+dict = dict
+list = list
+
+def Array(typecode, sequence, lock=True):
+ return array.array(typecode, sequence)
+
+class Value(object):
+ def __init__(self, typecode, value, lock=True):
+ self._typecode = typecode
+ self._value = value
+
+ @property
+ def value(self):
+ return self._value
+
+ @value.setter
+ def value(self, value):
+ self._value = value
+
+ def __repr__(self):
+ return '<%s(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
+
+def Manager():
+ return sys.modules[__name__]
+
+def shutdown():
+ pass
+
+def Pool(processes=None, initializer=None, initargs=()):
+ from ..pool import ThreadPool
+ return ThreadPool(processes, initializer, initargs)
+
+JoinableQueue = Queue
diff --git a/contrib/tools/python3/Lib/multiprocessing/dummy/connection.py b/contrib/tools/python3/Lib/multiprocessing/dummy/connection.py
new file mode 100644
index 0000000000..f0ce320fcf
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/dummy/connection.py
@@ -0,0 +1,75 @@
+#
+# Analogue of `multiprocessing.connection` which uses queues instead of sockets
+#
+# multiprocessing/dummy/connection.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = [ 'Client', 'Listener', 'Pipe' ]
+
+from queue import Queue
+
+
+families = [None]
+
+
+class Listener(object):
+
+ def __init__(self, address=None, family=None, backlog=1):
+ self._backlog_queue = Queue(backlog)
+
+ def accept(self):
+ return Connection(*self._backlog_queue.get())
+
+ def close(self):
+ self._backlog_queue = None
+
+ @property
+ def address(self):
+ return self._backlog_queue
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
+
+def Client(address):
+ _in, _out = Queue(), Queue()
+ address.put((_out, _in))
+ return Connection(_in, _out)
+
+
+def Pipe(duplex=True):
+ a, b = Queue(), Queue()
+ return Connection(a, b), Connection(b, a)
+
+
+class Connection(object):
+
+ def __init__(self, _in, _out):
+ self._out = _out
+ self._in = _in
+ self.send = self.send_bytes = _out.put
+ self.recv = self.recv_bytes = _in.get
+
+ def poll(self, timeout=0.0):
+ if self._in.qsize() > 0:
+ return True
+ if timeout <= 0.0:
+ return False
+ with self._in.not_empty:
+ self._in.not_empty.wait(timeout)
+ return self._in.qsize() > 0
+
+ def close(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
diff --git a/contrib/tools/python3/Lib/multiprocessing/forkserver.py b/contrib/tools/python3/Lib/multiprocessing/forkserver.py
new file mode 100644
index 0000000000..4642707dae
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/forkserver.py
@@ -0,0 +1,348 @@
+import errno
+import os
+import selectors
+import signal
+import socket
+import struct
+import sys
+import threading
+import warnings
+
+from . import connection
+from . import process
+from .context import reduction
+from . import resource_tracker
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
+ 'set_forkserver_preload']
+
+#
+#
+#
+
+MAXFDS_TO_SEND = 256
+SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
+
+#
+# Forkserver class
+#
+
+class ForkServer(object):
+
+ def __init__(self):
+ self._forkserver_address = None
+ self._forkserver_alive_fd = None
+ self._forkserver_pid = None
+ self._inherited_fds = None
+ self._lock = threading.Lock()
+ self._preload_modules = ['__main__']
+
+ def _stop(self):
+ # Method used by unit tests to stop the server
+ with self._lock:
+ self._stop_unlocked()
+
+ def _stop_unlocked(self):
+ if self._forkserver_pid is None:
+ return
+
+ # close the "alive" file descriptor asks the server to stop
+ os.close(self._forkserver_alive_fd)
+ self._forkserver_alive_fd = None
+
+ os.waitpid(self._forkserver_pid, 0)
+ self._forkserver_pid = None
+
+ if not util.is_abstract_socket_namespace(self._forkserver_address):
+ os.unlink(self._forkserver_address)
+ self._forkserver_address = None
+
+ def set_forkserver_preload(self, modules_names):
+ '''Set list of module names to try to load in forkserver process.'''
+ if not all(type(mod) is str for mod in modules_names):
+ raise TypeError('module_names must be a list of strings')
+ self._preload_modules = modules_names
+
+ def get_inherited_fds(self):
+ '''Return list of fds inherited from parent process.
+
+ This returns None if the current process was not started by fork
+ server.
+ '''
+ return self._inherited_fds
+
+ def connect_to_new_process(self, fds):
+ '''Request forkserver to create a child process.
+
+ Returns a pair of fds (status_r, data_w). The calling process can read
+ the child process's pid and (eventually) its returncode from status_r.
+ The calling process should write to data_w the pickled preparation and
+ process data.
+ '''
+ self.ensure_running()
+ if len(fds) + 4 >= MAXFDS_TO_SEND:
+ raise ValueError('too many fds')
+ with socket.socket(socket.AF_UNIX) as client:
+ client.connect(self._forkserver_address)
+ parent_r, child_w = os.pipe()
+ child_r, parent_w = os.pipe()
+ allfds = [child_r, child_w, self._forkserver_alive_fd,
+ resource_tracker.getfd()]
+ allfds += fds
+ try:
+ reduction.sendfds(client, allfds)
+ return parent_r, parent_w
+ except:
+ os.close(parent_r)
+ os.close(parent_w)
+ raise
+ finally:
+ os.close(child_r)
+ os.close(child_w)
+
+ def ensure_running(self):
+ '''Make sure that a fork server is running.
+
+ This can be called from any process. Note that usually a child
+ process will just reuse the forkserver started by its parent, so
+ ensure_running() will do nothing.
+ '''
+ with self._lock:
+ resource_tracker.ensure_running()
+ if self._forkserver_pid is not None:
+ # forkserver was launched before, is it still running?
+ pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
+ if not pid:
+ # still alive
+ return
+ # dead, launch it again
+ os.close(self._forkserver_alive_fd)
+ self._forkserver_address = None
+ self._forkserver_alive_fd = None
+ self._forkserver_pid = None
+
+ cmd = ('from multiprocessing.forkserver import main; ' +
+ 'main(%d, %d, %r, **%r)')
+
+ if self._preload_modules:
+ desired_keys = {'main_path', 'sys_path'}
+ data = spawn.get_preparation_data('ignore')
+ data = {x: y for x, y in data.items() if x in desired_keys}
+ else:
+ data = {}
+
+ with socket.socket(socket.AF_UNIX) as listener:
+ address = connection.arbitrary_address('AF_UNIX')
+ listener.bind(address)
+ if not util.is_abstract_socket_namespace(address):
+ os.chmod(address, 0o600)
+ listener.listen()
+
+ # all client processes own the write end of the "alive" pipe;
+ # when they all terminate the read end becomes ready.
+ alive_r, alive_w = os.pipe()
+ try:
+ fds_to_pass = [listener.fileno(), alive_r]
+ cmd %= (listener.fileno(), alive_r, self._preload_modules,
+ data)
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd]
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(alive_w)
+ raise
+ finally:
+ os.close(alive_r)
+ self._forkserver_address = address
+ self._forkserver_alive_fd = alive_w
+ self._forkserver_pid = pid
+
+#
+#
+#
+
+def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
+ '''Run forkserver.'''
+ if preload:
+ if '__main__' in preload and main_path is not None:
+ process.current_process()._inheriting = True
+ try:
+ spawn.import_main_path(main_path)
+ finally:
+ del process.current_process()._inheriting
+ for modname in preload:
+ try:
+ __import__(modname)
+ except ImportError:
+ pass
+
+ util._close_stdin()
+
+ sig_r, sig_w = os.pipe()
+ os.set_blocking(sig_r, False)
+ os.set_blocking(sig_w, False)
+
+ def sigchld_handler(*_unused):
+ # Dummy signal handler, doesn't do anything
+ pass
+
+ handlers = {
+ # unblocking SIGCHLD allows the wakeup fd to notify our event loop
+ signal.SIGCHLD: sigchld_handler,
+ # protect the process from ^C
+ signal.SIGINT: signal.SIG_IGN,
+ }
+ old_handlers = {sig: signal.signal(sig, val)
+ for (sig, val) in handlers.items()}
+
+ # calling os.write() in the Python signal handler is racy
+ signal.set_wakeup_fd(sig_w)
+
+ # map child pids to client fds
+ pid_to_fd = {}
+
+ with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
+ selectors.DefaultSelector() as selector:
+ _forkserver._forkserver_address = listener.getsockname()
+
+ selector.register(listener, selectors.EVENT_READ)
+ selector.register(alive_r, selectors.EVENT_READ)
+ selector.register(sig_r, selectors.EVENT_READ)
+
+ while True:
+ try:
+ while True:
+ rfds = [key.fileobj for (key, events) in selector.select()]
+ if rfds:
+ break
+
+ if alive_r in rfds:
+ # EOF because no more client processes left
+ assert os.read(alive_r, 1) == b'', "Not at EOF?"
+ raise SystemExit
+
+ if sig_r in rfds:
+ # Got SIGCHLD
+ os.read(sig_r, 65536) # exhaust
+ while True:
+ # Scan for child processes
+ try:
+ pid, sts = os.waitpid(-1, os.WNOHANG)
+ except ChildProcessError:
+ break
+ if pid == 0:
+ break
+ child_w = pid_to_fd.pop(pid, None)
+ if child_w is not None:
+ returncode = os.waitstatus_to_exitcode(sts)
+
+ # Send exit code to client process
+ try:
+ write_signed(child_w, returncode)
+ except BrokenPipeError:
+ # client vanished
+ pass
+ os.close(child_w)
+ else:
+ # This shouldn't happen really
+ warnings.warn('forkserver: waitpid returned '
+ 'unexpected pid %d' % pid)
+
+ if listener in rfds:
+ # Incoming fork request
+ with listener.accept()[0] as s:
+ # Receive fds from client
+ fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+ if len(fds) > MAXFDS_TO_SEND:
+ raise RuntimeError(
+ "Too many ({0:n}) fds to send".format(
+ len(fds)))
+ child_r, child_w, *fds = fds
+ s.close()
+ pid = os.fork()
+ if pid == 0:
+ # Child
+ code = 1
+ try:
+ listener.close()
+ selector.close()
+ unused_fds = [alive_r, child_w, sig_r, sig_w]
+ unused_fds.extend(pid_to_fd.values())
+ code = _serve_one(child_r, fds,
+ unused_fds,
+ old_handlers)
+ except Exception:
+ sys.excepthook(*sys.exc_info())
+ sys.stderr.flush()
+ finally:
+ os._exit(code)
+ else:
+ # Send pid to client process
+ try:
+ write_signed(child_w, pid)
+ except BrokenPipeError:
+ # client vanished
+ pass
+ pid_to_fd[pid] = child_w
+ os.close(child_r)
+ for fd in fds:
+ os.close(fd)
+
+ except OSError as e:
+ if e.errno != errno.ECONNABORTED:
+ raise
+
+
+def _serve_one(child_r, fds, unused_fds, handlers):
+ # close unnecessary stuff and reset signal handlers
+ signal.set_wakeup_fd(-1)
+ for sig, val in handlers.items():
+ signal.signal(sig, val)
+ for fd in unused_fds:
+ os.close(fd)
+
+ (_forkserver._forkserver_alive_fd,
+ resource_tracker._resource_tracker._fd,
+ *_forkserver._inherited_fds) = fds
+
+ # Run process object received over pipe
+ parent_sentinel = os.dup(child_r)
+ code = spawn._main(child_r, parent_sentinel)
+
+ return code
+
+
+#
+# Read and write signed numbers
+#
+
+def read_signed(fd):
+ data = b''
+ length = SIGNED_STRUCT.size
+ while len(data) < length:
+ s = os.read(fd, length - len(data))
+ if not s:
+ raise EOFError('unexpected EOF')
+ data += s
+ return SIGNED_STRUCT.unpack(data)[0]
+
+def write_signed(fd, n):
+ msg = SIGNED_STRUCT.pack(n)
+ while msg:
+ nbytes = os.write(fd, msg)
+ if nbytes == 0:
+ raise RuntimeError('should not get here')
+ msg = msg[nbytes:]
+
+#
+#
+#
+
+_forkserver = ForkServer()
+ensure_running = _forkserver.ensure_running
+get_inherited_fds = _forkserver.get_inherited_fds
+connect_to_new_process = _forkserver.connect_to_new_process
+set_forkserver_preload = _forkserver.set_forkserver_preload
diff --git a/contrib/tools/python3/Lib/multiprocessing/heap.py b/contrib/tools/python3/Lib/multiprocessing/heap.py
new file mode 100644
index 0000000000..6217dfe126
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/heap.py
@@ -0,0 +1,337 @@
+#
+# Module which supports allocation of memory from an mmap
+#
+# multiprocessing/heap.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import bisect
+from collections import defaultdict
+import mmap
+import os
+import sys
+import tempfile
+import threading
+
+from .context import reduction, assert_spawning
+from . import util
+
+__all__ = ['BufferWrapper']
+
+#
+# Inheritable class which wraps an mmap, and from which blocks can be allocated
+#
+
+if sys.platform == 'win32':
+
+ import _winapi
+
+ class Arena(object):
+ """
+ A shared memory area backed by anonymous memory (Windows).
+ """
+
+ _rand = tempfile._RandomNameSequence()
+
+ def __init__(self, size):
+ self.size = size
+ for i in range(100):
+ name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
+ buf = mmap.mmap(-1, size, tagname=name)
+ if _winapi.GetLastError() == 0:
+ break
+ # We have reopened a preexisting mmap.
+ buf.close()
+ else:
+ raise FileExistsError('Cannot find name for new mmap')
+ self.name = name
+ self.buffer = buf
+ self._state = (self.size, self.name)
+
+ def __getstate__(self):
+ assert_spawning(self)
+ return self._state
+
+ def __setstate__(self, state):
+ self.size, self.name = self._state = state
+ # Reopen existing mmap
+ self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
+ # XXX Temporarily preventing buildbot failures while determining
+ # XXX the correct long-term fix. See issue 23060
+ #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
+
+else:
+
+ class Arena(object):
+ """
+ A shared memory area backed by a temporary file (POSIX).
+ """
+
+ if sys.platform == 'linux':
+ _dir_candidates = ['/dev/shm']
+ else:
+ _dir_candidates = []
+
+ def __init__(self, size, fd=-1):
+ self.size = size
+ self.fd = fd
+ if fd == -1:
+ # Arena is created anew (if fd != -1, it means we're coming
+ # from rebuild_arena() below)
+ self.fd, name = tempfile.mkstemp(
+ prefix='pym-%d-'%os.getpid(),
+ dir=self._choose_dir(size))
+ os.unlink(name)
+ util.Finalize(self, os.close, (self.fd,))
+ os.ftruncate(self.fd, size)
+ self.buffer = mmap.mmap(self.fd, self.size)
+
+ def _choose_dir(self, size):
+ # Choose a non-storage backed directory if possible,
+ # to improve performance
+ for d in self._dir_candidates:
+ st = os.statvfs(d)
+ if st.f_bavail * st.f_frsize >= size: # enough free space?
+ return d
+ return util.get_temp_dir()
+
+ def reduce_arena(a):
+ if a.fd == -1:
+ raise ValueError('Arena is unpicklable because '
+ 'forking was enabled when it was created')
+ return rebuild_arena, (a.size, reduction.DupFd(a.fd))
+
+ def rebuild_arena(size, dupfd):
+ return Arena(size, dupfd.detach())
+
+ reduction.register(Arena, reduce_arena)
+
+#
+# Class allowing allocation of chunks of memory from arenas
+#
+
+class Heap(object):
+
+ # Minimum malloc() alignment
+ _alignment = 8
+
+ _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
+ _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
+
+ def __init__(self, size=mmap.PAGESIZE):
+ self._lastpid = os.getpid()
+ self._lock = threading.Lock()
+ # Current arena allocation size
+ self._size = size
+ # A sorted list of available block sizes in arenas
+ self._lengths = []
+
+ # Free block management:
+ # - map each block size to a list of `(Arena, start, stop)` blocks
+ self._len_to_seq = {}
+ # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
+ # starting at that offset
+ self._start_to_block = {}
+ # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
+ # ending at that offset
+ self._stop_to_block = {}
+
+ # Map arenas to their `(Arena, start, stop)` blocks in use
+ self._allocated_blocks = defaultdict(set)
+ self._arenas = []
+
+ # List of pending blocks to free - see comment in free() below
+ self._pending_free_blocks = []
+
+ # Statistics
+ self._n_mallocs = 0
+ self._n_frees = 0
+
+ @staticmethod
+ def _roundup(n, alignment):
+ # alignment must be a power of 2
+ mask = alignment - 1
+ return (n + mask) & ~mask
+
+ def _new_arena(self, size):
+ # Create a new arena with at least the given *size*
+ length = self._roundup(max(self._size, size), mmap.PAGESIZE)
+ # We carve larger and larger arenas, for efficiency, until we
+ # reach a large-ish size (roughly L3 cache-sized)
+ if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
+ self._size *= 2
+ util.info('allocating a new mmap of length %d', length)
+ arena = Arena(length)
+ self._arenas.append(arena)
+ return (arena, 0, length)
+
+ def _discard_arena(self, arena):
+ # Possibly delete the given (unused) arena
+ length = arena.size
+ # Reusing an existing arena is faster than creating a new one, so
+ # we only reclaim space if it's large enough.
+ if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
+ return
+ blocks = self._allocated_blocks.pop(arena)
+ assert not blocks
+ del self._start_to_block[(arena, 0)]
+ del self._stop_to_block[(arena, length)]
+ self._arenas.remove(arena)
+ seq = self._len_to_seq[length]
+ seq.remove((arena, 0, length))
+ if not seq:
+ del self._len_to_seq[length]
+ self._lengths.remove(length)
+
+ def _malloc(self, size):
+ # returns a large enough block -- it might be much larger
+ i = bisect.bisect_left(self._lengths, size)
+ if i == len(self._lengths):
+ return self._new_arena(size)
+ else:
+ length = self._lengths[i]
+ seq = self._len_to_seq[length]
+ block = seq.pop()
+ if not seq:
+ del self._len_to_seq[length], self._lengths[i]
+
+ (arena, start, stop) = block
+ del self._start_to_block[(arena, start)]
+ del self._stop_to_block[(arena, stop)]
+ return block
+
+ def _add_free_block(self, block):
+ # make block available and try to merge with its neighbours in the arena
+ (arena, start, stop) = block
+
+ try:
+ prev_block = self._stop_to_block[(arena, start)]
+ except KeyError:
+ pass
+ else:
+ start, _ = self._absorb(prev_block)
+
+ try:
+ next_block = self._start_to_block[(arena, stop)]
+ except KeyError:
+ pass
+ else:
+ _, stop = self._absorb(next_block)
+
+ block = (arena, start, stop)
+ length = stop - start
+
+ try:
+ self._len_to_seq[length].append(block)
+ except KeyError:
+ self._len_to_seq[length] = [block]
+ bisect.insort(self._lengths, length)
+
+ self._start_to_block[(arena, start)] = block
+ self._stop_to_block[(arena, stop)] = block
+
+ def _absorb(self, block):
+ # deregister this block so it can be merged with a neighbour
+ (arena, start, stop) = block
+ del self._start_to_block[(arena, start)]
+ del self._stop_to_block[(arena, stop)]
+
+ length = stop - start
+ seq = self._len_to_seq[length]
+ seq.remove(block)
+ if not seq:
+ del self._len_to_seq[length]
+ self._lengths.remove(length)
+
+ return start, stop
+
+ def _remove_allocated_block(self, block):
+ arena, start, stop = block
+ blocks = self._allocated_blocks[arena]
+ blocks.remove((start, stop))
+ if not blocks:
+ # Arena is entirely free, discard it from this process
+ self._discard_arena(arena)
+
+ def _free_pending_blocks(self):
+ # Free all the blocks in the pending list - called with the lock held.
+ while True:
+ try:
+ block = self._pending_free_blocks.pop()
+ except IndexError:
+ break
+ self._add_free_block(block)
+ self._remove_allocated_block(block)
+
+ def free(self, block):
+ # free a block returned by malloc()
+ # Since free() can be called asynchronously by the GC, it could happen
+ # that it's called while self._lock is held: in that case,
+ # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
+ # trylock is used instead, and if the lock can't be acquired
+ # immediately, the block is added to a list of blocks to be freed
+ # synchronously sometimes later from malloc() or free(), by calling
+ # _free_pending_blocks() (appending and retrieving from a list is not
+ # strictly thread-safe but under CPython it's atomic thanks to the GIL).
+ if os.getpid() != self._lastpid:
+ raise ValueError(
+ "My pid ({0:n}) is not last pid {1:n}".format(
+ os.getpid(),self._lastpid))
+ if not self._lock.acquire(False):
+ # can't acquire the lock right now, add the block to the list of
+ # pending blocks to free
+ self._pending_free_blocks.append(block)
+ else:
+ # we hold the lock
+ try:
+ self._n_frees += 1
+ self._free_pending_blocks()
+ self._add_free_block(block)
+ self._remove_allocated_block(block)
+ finally:
+ self._lock.release()
+
+ def malloc(self, size):
+ # return a block of right size (possibly rounded up)
+ if size < 0:
+ raise ValueError("Size {0:n} out of range".format(size))
+ if sys.maxsize <= size:
+ raise OverflowError("Size {0:n} too large".format(size))
+ if os.getpid() != self._lastpid:
+ self.__init__() # reinitialize after fork
+ with self._lock:
+ self._n_mallocs += 1
+ # allow pending blocks to be marked available
+ self._free_pending_blocks()
+ size = self._roundup(max(size, 1), self._alignment)
+ (arena, start, stop) = self._malloc(size)
+ real_stop = start + size
+ if real_stop < stop:
+ # if the returned block is larger than necessary, mark
+ # the remainder available
+ self._add_free_block((arena, real_stop, stop))
+ self._allocated_blocks[arena].add((start, real_stop))
+ return (arena, start, real_stop)
+
+#
+# Class wrapping a block allocated out of a Heap -- can be inherited by child process
+#
+
+class BufferWrapper(object):
+
+ _heap = Heap()
+
+ def __init__(self, size):
+ if size < 0:
+ raise ValueError("Size {0:n} out of range".format(size))
+ if sys.maxsize <= size:
+ raise OverflowError("Size {0:n} too large".format(size))
+ block = BufferWrapper._heap.malloc(size)
+ self._state = (block, size)
+ util.Finalize(self, BufferWrapper._heap.free, args=(block,))
+
+ def create_memoryview(self):
+ (arena, start, stop), size = self._state
+ return memoryview(arena.buffer)[start:start+size]
diff --git a/contrib/tools/python3/Lib/multiprocessing/managers.py b/contrib/tools/python3/Lib/multiprocessing/managers.py
new file mode 100644
index 0000000000..75d9c18c20
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/managers.py
@@ -0,0 +1,1380 @@
+#
+# Module providing manager classes for dealing
+# with shared objects
+#
+# multiprocessing/managers.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
+
+#
+# Imports
+#
+
+import sys
+import threading
+import signal
+import array
+import queue
+import time
+import types
+import os
+from os import getpid
+
+from traceback import format_exc
+
+from . import connection
+from .context import reduction, get_spawning_popen, ProcessError
+from . import pool
+from . import process
+from . import util
+from . import get_context
+try:
+ from . import shared_memory
+except ImportError:
+ HAS_SHMEM = False
+else:
+ HAS_SHMEM = True
+ __all__.append('SharedMemoryManager')
+
+#
+# Register some things for pickling
+#
+
+def reduce_array(a):
+ return array.array, (a.typecode, a.tobytes())
+reduction.register(array.array, reduce_array)
+
+view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
+def rebuild_as_list(obj):
+ return list, (list(obj),)
+for view_type in view_types:
+ reduction.register(view_type, rebuild_as_list)
+del view_type, view_types
+
+#
+# Type for identifying shared objects
+#
+
+class Token(object):
+ '''
+ Type to uniquely identify a shared object
+ '''
+ __slots__ = ('typeid', 'address', 'id')
+
+ def __init__(self, typeid, address, id):
+ (self.typeid, self.address, self.id) = (typeid, address, id)
+
+ def __getstate__(self):
+ return (self.typeid, self.address, self.id)
+
+ def __setstate__(self, state):
+ (self.typeid, self.address, self.id) = state
+
+ def __repr__(self):
+ return '%s(typeid=%r, address=%r, id=%r)' % \
+ (self.__class__.__name__, self.typeid, self.address, self.id)
+
+#
+# Function for communication with a manager's server process
+#
+
+def dispatch(c, id, methodname, args=(), kwds={}):
+ '''
+ Send a message to manager using connection `c` and return response
+ '''
+ c.send((id, methodname, args, kwds))
+ kind, result = c.recv()
+ if kind == '#RETURN':
+ return result
+ raise convert_to_error(kind, result)
+
+def convert_to_error(kind, result):
+ if kind == '#ERROR':
+ return result
+ elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
+ if not isinstance(result, str):
+ raise TypeError(
+ "Result {0!r} (kind '{1}') type is {2}, not str".format(
+ result, kind, type(result)))
+ if kind == '#UNSERIALIZABLE':
+ return RemoteError('Unserializable message: %s\n' % result)
+ else:
+ return RemoteError(result)
+ else:
+ return ValueError('Unrecognized message type {!r}'.format(kind))
+
+class RemoteError(Exception):
+ def __str__(self):
+ return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
+
+#
+# Functions for finding the method names of an object
+#
+
+def all_methods(obj):
+ '''
+ Return a list of names of methods of `obj`
+ '''
+ temp = []
+ for name in dir(obj):
+ func = getattr(obj, name)
+ if callable(func):
+ temp.append(name)
+ return temp
+
+def public_methods(obj):
+ '''
+ Return a list of names of methods of `obj` which do not start with '_'
+ '''
+ return [name for name in all_methods(obj) if name[0] != '_']
+
+#
+# Server which is run in a process controlled by a manager
+#
+
+class Server(object):
+ '''
+ Server class which runs in a process controlled by a manager object
+ '''
+ public = ['shutdown', 'create', 'accept_connection', 'get_methods',
+ 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
+
+ def __init__(self, registry, address, authkey, serializer):
+ if not isinstance(authkey, bytes):
+ raise TypeError(
+ "Authkey {0!r} is type {1!s}, not bytes".format(
+ authkey, type(authkey)))
+ self.registry = registry
+ self.authkey = process.AuthenticationString(authkey)
+ Listener, Client = listener_client[serializer]
+
+ # do authentication later
+ self.listener = Listener(address=address, backlog=128)
+ self.address = self.listener.address
+
+ self.id_to_obj = {'0': (None, ())}
+ self.id_to_refcount = {}
+ self.id_to_local_proxy_obj = {}
+ self.mutex = threading.Lock()
+
+ def serve_forever(self):
+ '''
+ Run the server forever
+ '''
+ self.stop_event = threading.Event()
+ process.current_process()._manager_server = self
+ try:
+ accepter = threading.Thread(target=self.accepter)
+ accepter.daemon = True
+ accepter.start()
+ try:
+ while not self.stop_event.is_set():
+ self.stop_event.wait(1)
+ except (KeyboardInterrupt, SystemExit):
+ pass
+ finally:
+ if sys.stdout != sys.__stdout__: # what about stderr?
+ util.debug('resetting stdout, stderr')
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+ sys.exit(0)
+
+ def accepter(self):
+ while True:
+ try:
+ c = self.listener.accept()
+ except OSError:
+ continue
+ t = threading.Thread(target=self.handle_request, args=(c,))
+ t.daemon = True
+ t.start()
+
+ def _handle_request(self, c):
+ request = None
+ try:
+ connection.deliver_challenge(c, self.authkey)
+ connection.answer_challenge(c, self.authkey)
+ request = c.recv()
+ ignore, funcname, args, kwds = request
+ assert funcname in self.public, '%r unrecognized' % funcname
+ func = getattr(self, funcname)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ try:
+ result = func(c, *args, **kwds)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ msg = ('#RETURN', result)
+
+ try:
+ c.send(msg)
+ except Exception as e:
+ try:
+ c.send(('#TRACEBACK', format_exc()))
+ except Exception:
+ pass
+ util.info('Failure to send message: %r', msg)
+ util.info(' ... request was %r', request)
+ util.info(' ... exception was %r', e)
+
+ def handle_request(self, conn):
+ '''
+ Handle a new connection
+ '''
+ try:
+ self._handle_request(conn)
+ except SystemExit:
+ # Server.serve_client() calls sys.exit(0) on EOF
+ pass
+ finally:
+ conn.close()
+
+ def serve_client(self, conn):
+ '''
+ Handle requests from the proxies in a particular process/thread
+ '''
+ util.debug('starting server thread to service %r',
+ threading.current_thread().name)
+
+ recv = conn.recv
+ send = conn.send
+ id_to_obj = self.id_to_obj
+
+ while not self.stop_event.is_set():
+
+ try:
+ methodname = obj = None
+ request = recv()
+ ident, methodname, args, kwds = request
+ try:
+ obj, exposed, gettypeid = id_to_obj[ident]
+ except KeyError as ke:
+ try:
+ obj, exposed, gettypeid = \
+ self.id_to_local_proxy_obj[ident]
+ except KeyError:
+ raise ke
+
+ if methodname not in exposed:
+ raise AttributeError(
+ 'method %r of %r object is not in exposed=%r' %
+ (methodname, type(obj), exposed)
+ )
+
+ function = getattr(obj, methodname)
+
+ try:
+ res = function(*args, **kwds)
+ except Exception as e:
+ msg = ('#ERROR', e)
+ else:
+ typeid = gettypeid and gettypeid.get(methodname, None)
+ if typeid:
+ rident, rexposed = self.create(conn, typeid, res)
+ token = Token(typeid, self.address, rident)
+ msg = ('#PROXY', (rexposed, token))
+ else:
+ msg = ('#RETURN', res)
+
+ except AttributeError:
+ if methodname is None:
+ msg = ('#TRACEBACK', format_exc())
+ else:
+ try:
+ fallback_func = self.fallback_mapping[methodname]
+ result = fallback_func(
+ self, conn, ident, obj, *args, **kwds
+ )
+ msg = ('#RETURN', result)
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+
+ except EOFError:
+ util.debug('got EOF -- exiting thread serving %r',
+ threading.current_thread().name)
+ sys.exit(0)
+
+ except Exception:
+ msg = ('#TRACEBACK', format_exc())
+
+ try:
+ try:
+ send(msg)
+ except Exception:
+ send(('#UNSERIALIZABLE', format_exc()))
+ except Exception as e:
+ util.info('exception in thread serving %r',
+ threading.current_thread().name)
+ util.info(' ... message was %r', msg)
+ util.info(' ... exception was %r', e)
+ conn.close()
+ sys.exit(1)
+
+ def fallback_getvalue(self, conn, ident, obj):
+ return obj
+
+ def fallback_str(self, conn, ident, obj):
+ return str(obj)
+
+ def fallback_repr(self, conn, ident, obj):
+ return repr(obj)
+
+ fallback_mapping = {
+ '__str__':fallback_str,
+ '__repr__':fallback_repr,
+ '#GETVALUE':fallback_getvalue
+ }
+
+ def dummy(self, c):
+ pass
+
+ def debug_info(self, c):
+ '''
+ Return some info --- useful to spot problems with refcounting
+ '''
+ # Perhaps include debug info about 'c'?
+ with self.mutex:
+ result = []
+ keys = list(self.id_to_refcount.keys())
+ keys.sort()
+ for ident in keys:
+ if ident != '0':
+ result.append(' %s: refcount=%s\n %s' %
+ (ident, self.id_to_refcount[ident],
+ str(self.id_to_obj[ident][0])[:75]))
+ return '\n'.join(result)
+
+ def number_of_objects(self, c):
+ '''
+ Number of shared objects
+ '''
+ # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
+ return len(self.id_to_refcount)
+
+ def shutdown(self, c):
+ '''
+ Shutdown this process
+ '''
+ try:
+ util.debug('manager received shutdown message')
+ c.send(('#RETURN', None))
+ except:
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.stop_event.set()
+
+ def create(self, c, typeid, /, *args, **kwds):
+ '''
+ Create a new shared object and return its id
+ '''
+ with self.mutex:
+ callable, exposed, method_to_typeid, proxytype = \
+ self.registry[typeid]
+
+ if callable is None:
+ if kwds or (len(args) != 1):
+ raise ValueError(
+ "Without callable, must have one non-keyword argument")
+ obj = args[0]
+ else:
+ obj = callable(*args, **kwds)
+
+ if exposed is None:
+ exposed = public_methods(obj)
+ if method_to_typeid is not None:
+ if not isinstance(method_to_typeid, dict):
+ raise TypeError(
+ "Method_to_typeid {0!r}: type {1!s}, not dict".format(
+ method_to_typeid, type(method_to_typeid)))
+ exposed = list(exposed) + list(method_to_typeid)
+
+ ident = '%x' % id(obj) # convert to string because xmlrpclib
+ # only has 32 bit signed integers
+ util.debug('%r callable returned object with id %r', typeid, ident)
+
+ self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
+ if ident not in self.id_to_refcount:
+ self.id_to_refcount[ident] = 0
+
+ self.incref(c, ident)
+ return ident, tuple(exposed)
+
+ def get_methods(self, c, token):
+ '''
+ Return the methods of the shared object indicated by token
+ '''
+ return tuple(self.id_to_obj[token.id][1])
+
+ def accept_connection(self, c, name):
+ '''
+ Spawn a new thread to serve this connection
+ '''
+ threading.current_thread().name = name
+ c.send(('#RETURN', None))
+ self.serve_client(c)
+
+ def incref(self, c, ident):
+ with self.mutex:
+ try:
+ self.id_to_refcount[ident] += 1
+ except KeyError as ke:
+ # If no external references exist but an internal (to the
+ # manager) still does and a new external reference is created
+ # from it, restore the manager's tracking of it from the
+ # previously stashed internal ref.
+ if ident in self.id_to_local_proxy_obj:
+ self.id_to_refcount[ident] = 1
+ self.id_to_obj[ident] = \
+ self.id_to_local_proxy_obj[ident]
+ util.debug('Server re-enabled tracking & INCREF %r', ident)
+ else:
+ raise ke
+
+ def decref(self, c, ident):
+ if ident not in self.id_to_refcount and \
+ ident in self.id_to_local_proxy_obj:
+ util.debug('Server DECREF skipping %r', ident)
+ return
+
+ with self.mutex:
+ if self.id_to_refcount[ident] <= 0:
+ raise AssertionError(
+ "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
+ ident, self.id_to_obj[ident],
+ self.id_to_refcount[ident]))
+ self.id_to_refcount[ident] -= 1
+ if self.id_to_refcount[ident] == 0:
+ del self.id_to_refcount[ident]
+
+ if ident not in self.id_to_refcount:
+ # Two-step process in case the object turns out to contain other
+ # proxy objects (e.g. a managed list of managed lists).
+ # Otherwise, deleting self.id_to_obj[ident] would trigger the
+ # deleting of the stored value (another managed object) which would
+ # in turn attempt to acquire the mutex that is already held here.
+ self.id_to_obj[ident] = (None, (), None) # thread-safe
+ util.debug('disposing of obj with id %r', ident)
+ with self.mutex:
+ del self.id_to_obj[ident]
+
+
+#
+# Class to represent state of a manager
+#
+
+class State(object):
+ __slots__ = ['value']
+ INITIAL = 0
+ STARTED = 1
+ SHUTDOWN = 2
+
+#
+# Mapping from serializer name to Listener and Client types
+#
+
+listener_client = {
+ 'pickle' : (connection.Listener, connection.Client),
+ 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
+ }
+
+#
+# Definition of BaseManager
+#
+
+class BaseManager(object):
+ '''
+ Base class for managers
+ '''
+ _registry = {}
+ _Server = Server
+
+ def __init__(self, address=None, authkey=None, serializer='pickle',
+ ctx=None, *, shutdown_timeout=1.0):
+ if authkey is None:
+ authkey = process.current_process().authkey
+ self._address = address # XXX not final address if eg ('', 0)
+ self._authkey = process.AuthenticationString(authkey)
+ self._state = State()
+ self._state.value = State.INITIAL
+ self._serializer = serializer
+ self._Listener, self._Client = listener_client[serializer]
+ self._ctx = ctx or get_context()
+ self._shutdown_timeout = shutdown_timeout
+
+ def get_server(self):
+ '''
+ Return server object with serve_forever() method and address attribute
+ '''
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+ return Server(self._registry, self._address,
+ self._authkey, self._serializer)
+
+ def connect(self):
+ '''
+ Connect manager object to the server process
+ '''
+ Listener, Client = listener_client[self._serializer]
+ conn = Client(self._address, authkey=self._authkey)
+ dispatch(conn, None, 'dummy')
+ self._state.value = State.STARTED
+
+ def start(self, initializer=None, initargs=()):
+ '''
+ Spawn a server process for this manager object
+ '''
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+
+ if initializer is not None and not callable(initializer):
+ raise TypeError('initializer must be a callable')
+
+ # pipe over which we will retrieve address of server
+ reader, writer = connection.Pipe(duplex=False)
+
+ # spawn process which runs a server
+ self._process = self._ctx.Process(
+ target=type(self)._run_server,
+ args=(self._registry, self._address, self._authkey,
+ self._serializer, writer, initializer, initargs),
+ )
+ ident = ':'.join(str(i) for i in self._process._identity)
+ self._process.name = type(self).__name__ + '-' + ident
+ self._process.start()
+
+ # get address of server
+ writer.close()
+ self._address = reader.recv()
+ reader.close()
+
+ # register a finalizer
+ self._state.value = State.STARTED
+ self.shutdown = util.Finalize(
+ self, type(self)._finalize_manager,
+ args=(self._process, self._address, self._authkey, self._state,
+ self._Client, self._shutdown_timeout),
+ exitpriority=0
+ )
+
+ @classmethod
+ def _run_server(cls, registry, address, authkey, serializer, writer,
+ initializer=None, initargs=()):
+ '''
+ Create a server, report its address and run it
+ '''
+ # bpo-36368: protect server process from KeyboardInterrupt signals
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ if initializer is not None:
+ initializer(*initargs)
+
+ # create server
+ server = cls._Server(registry, address, authkey, serializer)
+
+ # inform parent process of the server's address
+ writer.send(server.address)
+ writer.close()
+
+ # run the manager
+ util.info('manager serving at %r', server.address)
+ server.serve_forever()
+
+ def _create(self, typeid, /, *args, **kwds):
+ '''
+ Create a new shared object; return the token and exposed tuple
+ '''
+ assert self._state.value == State.STARTED, 'server not yet started'
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
+ finally:
+ conn.close()
+ return Token(typeid, self._address, id), exposed
+
+ def join(self, timeout=None):
+ '''
+ Join the manager process (if it has been spawned)
+ '''
+ if self._process is not None:
+ self._process.join(timeout)
+ if not self._process.is_alive():
+ self._process = None
+
+ def _debug_info(self):
+ '''
+ Return some info about the servers shared objects and connections
+ '''
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ return dispatch(conn, None, 'debug_info')
+ finally:
+ conn.close()
+
+ def _number_of_objects(self):
+ '''
+ Return the number of shared objects
+ '''
+ conn = self._Client(self._address, authkey=self._authkey)
+ try:
+ return dispatch(conn, None, 'number_of_objects')
+ finally:
+ conn.close()
+
+ def __enter__(self):
+ if self._state.value == State.INITIAL:
+ self.start()
+ if self._state.value != State.STARTED:
+ if self._state.value == State.INITIAL:
+ raise ProcessError("Unable to start server")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("Manager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+
+ @staticmethod
+ def _finalize_manager(process, address, authkey, state, _Client,
+ shutdown_timeout):
+ '''
+ Shutdown the manager process; will be registered as a finalizer
+ '''
+ if process.is_alive():
+ util.info('sending shutdown message to manager')
+ try:
+ conn = _Client(address, authkey=authkey)
+ try:
+ dispatch(conn, None, 'shutdown')
+ finally:
+ conn.close()
+ except Exception:
+ pass
+
+ process.join(timeout=shutdown_timeout)
+ if process.is_alive():
+ util.info('manager still alive')
+ if hasattr(process, 'terminate'):
+ util.info('trying to `terminate()` manager process')
+ process.terminate()
+ process.join(timeout=shutdown_timeout)
+ if process.is_alive():
+ util.info('manager still alive after terminate')
+ process.kill()
+ process.join()
+
+ state.value = State.SHUTDOWN
+ try:
+ del BaseProxy._address_to_local[address]
+ except KeyError:
+ pass
+
+ @property
+ def address(self):
+ return self._address
+
+ @classmethod
+ def register(cls, typeid, callable=None, proxytype=None, exposed=None,
+ method_to_typeid=None, create_method=True):
+ '''
+ Register a typeid with the manager type
+ '''
+ if '_registry' not in cls.__dict__:
+ cls._registry = cls._registry.copy()
+
+ if proxytype is None:
+ proxytype = AutoProxy
+
+ exposed = exposed or getattr(proxytype, '_exposed_', None)
+
+ method_to_typeid = method_to_typeid or \
+ getattr(proxytype, '_method_to_typeid_', None)
+
+ if method_to_typeid:
+ for key, value in list(method_to_typeid.items()): # isinstance?
+ assert type(key) is str, '%r is not a string' % key
+ assert type(value) is str, '%r is not a string' % value
+
+ cls._registry[typeid] = (
+ callable, exposed, method_to_typeid, proxytype
+ )
+
+ if create_method:
+ def temp(self, /, *args, **kwds):
+ util.debug('requesting creation of a shared %r object', typeid)
+ token, exp = self._create(typeid, *args, **kwds)
+ proxy = proxytype(
+ token, self._serializer, manager=self,
+ authkey=self._authkey, exposed=exp
+ )
+ conn = self._Client(token.address, authkey=self._authkey)
+ dispatch(conn, None, 'decref', (token.id,))
+ return proxy
+ temp.__name__ = typeid
+ setattr(cls, typeid, temp)
+
+#
+# Subclass of set which get cleared after a fork
+#
+
+class ProcessLocalSet(set):
+ def __init__(self):
+ util.register_after_fork(self, lambda obj: obj.clear())
+ def __reduce__(self):
+ return type(self), ()
+
+#
+# Definition of BaseProxy
+#
+
+class BaseProxy(object):
+ '''
+ A base for proxies of shared objects
+ '''
+ _address_to_local = {}
+ _mutex = util.ForkAwareThreadLock()
+
+ def __init__(self, token, serializer, manager=None,
+ authkey=None, exposed=None, incref=True, manager_owned=False):
+ with BaseProxy._mutex:
+ tls_idset = BaseProxy._address_to_local.get(token.address, None)
+ if tls_idset is None:
+ tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
+ BaseProxy._address_to_local[token.address] = tls_idset
+
+ # self._tls is used to record the connection used by this
+ # thread to communicate with the manager at token.address
+ self._tls = tls_idset[0]
+
+ # self._idset is used to record the identities of all shared
+ # objects for which the current process owns references and
+ # which are in the manager at token.address
+ self._idset = tls_idset[1]
+
+ self._token = token
+ self._id = self._token.id
+ self._manager = manager
+ self._serializer = serializer
+ self._Client = listener_client[serializer][1]
+
+ # Should be set to True only when a proxy object is being created
+ # on the manager server; primary use case: nested proxy objects.
+ # RebuildProxy detects when a proxy is being created on the manager
+ # and sets this value appropriately.
+ self._owned_by_manager = manager_owned
+
+ if authkey is not None:
+ self._authkey = process.AuthenticationString(authkey)
+ elif self._manager is not None:
+ self._authkey = self._manager._authkey
+ else:
+ self._authkey = process.current_process().authkey
+
+ if incref:
+ self._incref()
+
+ util.register_after_fork(self, BaseProxy._after_fork)
+
+ def _connect(self):
+ util.debug('making connection to manager')
+ name = process.current_process().name
+ if threading.current_thread().name != 'MainThread':
+ name += '|' + threading.current_thread().name
+ conn = self._Client(self._token.address, authkey=self._authkey)
+ dispatch(conn, None, 'accept_connection', (name,))
+ self._tls.connection = conn
+
+ def _callmethod(self, methodname, args=(), kwds={}):
+ '''
+ Try to call a method of the referent and return a copy of the result
+ '''
+ try:
+ conn = self._tls.connection
+ except AttributeError:
+ util.debug('thread %r does not own a connection',
+ threading.current_thread().name)
+ self._connect()
+ conn = self._tls.connection
+
+ conn.send((self._id, methodname, args, kwds))
+ kind, result = conn.recv()
+
+ if kind == '#RETURN':
+ return result
+ elif kind == '#PROXY':
+ exposed, token = result
+ proxytype = self._manager._registry[token.typeid][-1]
+ token.address = self._token.address
+ proxy = proxytype(
+ token, self._serializer, manager=self._manager,
+ authkey=self._authkey, exposed=exposed
+ )
+ conn = self._Client(token.address, authkey=self._authkey)
+ dispatch(conn, None, 'decref', (token.id,))
+ return proxy
+ raise convert_to_error(kind, result)
+
+ def _getvalue(self):
+ '''
+ Get a copy of the value of the referent
+ '''
+ return self._callmethod('#GETVALUE')
+
+ def _incref(self):
+ if self._owned_by_manager:
+ util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
+ return
+
+ conn = self._Client(self._token.address, authkey=self._authkey)
+ dispatch(conn, None, 'incref', (self._id,))
+ util.debug('INCREF %r', self._token.id)
+
+ self._idset.add(self._id)
+
+ state = self._manager and self._manager._state
+
+ self._close = util.Finalize(
+ self, BaseProxy._decref,
+ args=(self._token, self._authkey, state,
+ self._tls, self._idset, self._Client),
+ exitpriority=10
+ )
+
+ @staticmethod
+ def _decref(token, authkey, state, tls, idset, _Client):
+ idset.discard(token.id)
+
+ # check whether manager is still alive
+ if state is None or state.value == State.STARTED:
+ # tell manager this process no longer cares about referent
+ try:
+ util.debug('DECREF %r', token.id)
+ conn = _Client(token.address, authkey=authkey)
+ dispatch(conn, None, 'decref', (token.id,))
+ except Exception as e:
+ util.debug('... decref failed %s', e)
+
+ else:
+ util.debug('DECREF %r -- manager already shutdown', token.id)
+
+ # check whether we can close this thread's connection because
+ # the process owns no more references to objects for this manager
+ if not idset and hasattr(tls, 'connection'):
+ util.debug('thread %r has no more proxies so closing conn',
+ threading.current_thread().name)
+ tls.connection.close()
+ del tls.connection
+
+ def _after_fork(self):
+ self._manager = None
+ try:
+ self._incref()
+ except Exception as e:
+ # the proxy may just be for a manager which has shutdown
+ util.info('incref failed: %s' % e)
+
+ def __reduce__(self):
+ kwds = {}
+ if get_spawning_popen() is not None:
+ kwds['authkey'] = self._authkey
+
+ if getattr(self, '_isauto', False):
+ kwds['exposed'] = self._exposed_
+ return (RebuildProxy,
+ (AutoProxy, self._token, self._serializer, kwds))
+ else:
+ return (RebuildProxy,
+ (type(self), self._token, self._serializer, kwds))
+
+ def __deepcopy__(self, memo):
+ return self._getvalue()
+
+ def __repr__(self):
+ return '<%s object, typeid %r at %#x>' % \
+ (type(self).__name__, self._token.typeid, id(self))
+
+ def __str__(self):
+ '''
+ Return representation of the referent (or a fall-back if that fails)
+ '''
+ try:
+ return self._callmethod('__repr__')
+ except Exception:
+ return repr(self)[:-1] + "; '__str__()' failed>"
+
+#
+# Function used for unpickling
+#
+
+def RebuildProxy(func, token, serializer, kwds):
+ '''
+ Function used for unpickling proxy objects.
+ '''
+ server = getattr(process.current_process(), '_manager_server', None)
+ if server and server.address == token.address:
+ util.debug('Rebuild a proxy owned by manager, token=%r', token)
+ kwds['manager_owned'] = True
+ if token.id not in server.id_to_local_proxy_obj:
+ server.id_to_local_proxy_obj[token.id] = \
+ server.id_to_obj[token.id]
+ incref = (
+ kwds.pop('incref', True) and
+ not getattr(process.current_process(), '_inheriting', False)
+ )
+ return func(token, serializer, incref=incref, **kwds)
+
+#
+# Functions to create proxies and proxy types
+#
+
+def MakeProxyType(name, exposed, _cache={}):
+ '''
+ Return a proxy type whose methods are given by `exposed`
+ '''
+ exposed = tuple(exposed)
+ try:
+ return _cache[(name, exposed)]
+ except KeyError:
+ pass
+
+ dic = {}
+
+ for meth in exposed:
+ exec('''def %s(self, /, *args, **kwds):
+ return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
+
+ ProxyType = type(name, (BaseProxy,), dic)
+ ProxyType._exposed_ = exposed
+ _cache[(name, exposed)] = ProxyType
+ return ProxyType
+
+
+def AutoProxy(token, serializer, manager=None, authkey=None,
+ exposed=None, incref=True, manager_owned=False):
+ '''
+ Return an auto-proxy for `token`
+ '''
+ _Client = listener_client[serializer][1]
+
+ if exposed is None:
+ conn = _Client(token.address, authkey=authkey)
+ try:
+ exposed = dispatch(conn, None, 'get_methods', (token,))
+ finally:
+ conn.close()
+
+ if authkey is None and manager is not None:
+ authkey = manager._authkey
+ if authkey is None:
+ authkey = process.current_process().authkey
+
+ ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
+ proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
+ incref=incref, manager_owned=manager_owned)
+ proxy._isauto = True
+ return proxy
+
+#
+# Types/callables which we will register with SyncManager
+#
+
+class Namespace(object):
+ def __init__(self, /, **kwds):
+ self.__dict__.update(kwds)
+ def __repr__(self):
+ items = list(self.__dict__.items())
+ temp = []
+ for name, value in items:
+ if not name.startswith('_'):
+ temp.append('%s=%r' % (name, value))
+ temp.sort()
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
+
+class Value(object):
+ def __init__(self, typecode, value, lock=True):
+ self._typecode = typecode
+ self._value = value
+ def get(self):
+ return self._value
+ def set(self, value):
+ self._value = value
+ def __repr__(self):
+ return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
+ value = property(get, set)
+
+def Array(typecode, sequence, lock=True):
+ return array.array(typecode, sequence)
+
+#
+# Proxy types used by SyncManager
+#
+
+class IteratorProxy(BaseProxy):
+ _exposed_ = ('__next__', 'send', 'throw', 'close')
+ def __iter__(self):
+ return self
+ def __next__(self, *args):
+ return self._callmethod('__next__', args)
+ def send(self, *args):
+ return self._callmethod('send', args)
+ def throw(self, *args):
+ return self._callmethod('throw', args)
+ def close(self, *args):
+ return self._callmethod('close', args)
+
+
+class AcquirerProxy(BaseProxy):
+ _exposed_ = ('acquire', 'release')
+ def acquire(self, blocking=True, timeout=None):
+ args = (blocking,) if timeout is None else (blocking, timeout)
+ return self._callmethod('acquire', args)
+ def release(self):
+ return self._callmethod('release')
+ def __enter__(self):
+ return self._callmethod('acquire')
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return self._callmethod('release')
+
+
+class ConditionProxy(AcquirerProxy):
+ _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def notify(self, n=1):
+ return self._callmethod('notify', (n,))
+ def notify_all(self):
+ return self._callmethod('notify_all')
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = time.monotonic() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - time.monotonic()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
+
+class EventProxy(BaseProxy):
+ _exposed_ = ('is_set', 'set', 'clear', 'wait')
+ def is_set(self):
+ return self._callmethod('is_set')
+ def set(self):
+ return self._callmethod('set')
+ def clear(self):
+ return self._callmethod('clear')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+
+
+class BarrierProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def abort(self):
+ return self._callmethod('abort')
+ def reset(self):
+ return self._callmethod('reset')
+ @property
+ def parties(self):
+ return self._callmethod('__getattribute__', ('parties',))
+ @property
+ def n_waiting(self):
+ return self._callmethod('__getattribute__', ('n_waiting',))
+ @property
+ def broken(self):
+ return self._callmethod('__getattribute__', ('broken',))
+
+
+class NamespaceProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
+ def __getattr__(self, key):
+ if key[0] == '_':
+ return object.__getattribute__(self, key)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__getattribute__', (key,))
+ def __setattr__(self, key, value):
+ if key[0] == '_':
+ return object.__setattr__(self, key, value)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__setattr__', (key, value))
+ def __delattr__(self, key):
+ if key[0] == '_':
+ return object.__delattr__(self, key)
+ callmethod = object.__getattribute__(self, '_callmethod')
+ return callmethod('__delattr__', (key,))
+
+
+class ValueProxy(BaseProxy):
+ _exposed_ = ('get', 'set')
+ def get(self):
+ return self._callmethod('get')
+ def set(self, value):
+ return self._callmethod('set', (value,))
+ value = property(get, set)
+
+ __class_getitem__ = classmethod(types.GenericAlias)
+
+
+BaseListProxy = MakeProxyType('BaseListProxy', (
+ '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
+ '__mul__', '__reversed__', '__rmul__', '__setitem__',
+ 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
+ 'reverse', 'sort', '__imul__'
+ ))
+class ListProxy(BaseListProxy):
+ def __iadd__(self, value):
+ self._callmethod('extend', (value,))
+ return self
+ def __imul__(self, value):
+ self._callmethod('__imul__', (value,))
+ return self
+
+
+DictProxy = MakeProxyType('DictProxy', (
+ '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
+ '__setitem__', 'clear', 'copy', 'get', 'items',
+ 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
+ ))
+DictProxy._method_to_typeid_ = {
+ '__iter__': 'Iterator',
+ }
+
+
+ArrayProxy = MakeProxyType('ArrayProxy', (
+ '__len__', '__getitem__', '__setitem__'
+ ))
+
+
+BasePoolProxy = MakeProxyType('PoolProxy', (
+ 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
+ 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
+ ))
+BasePoolProxy._method_to_typeid_ = {
+ 'apply_async': 'AsyncResult',
+ 'map_async': 'AsyncResult',
+ 'starmap_async': 'AsyncResult',
+ 'imap': 'Iterator',
+ 'imap_unordered': 'Iterator'
+ }
+class PoolProxy(BasePoolProxy):
+ def __enter__(self):
+ return self
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.terminate()
+
+#
+# Definition of SyncManager
+#
+
+class SyncManager(BaseManager):
+ '''
+ Subclass of `BaseManager` which supports a number of shared object types.
+
+ The types registered are those intended for the synchronization
+ of threads, plus `dict`, `list` and `Namespace`.
+
+ The `multiprocessing.Manager()` function creates started instances of
+ this class.
+ '''
+
+SyncManager.register('Queue', queue.Queue)
+SyncManager.register('JoinableQueue', queue.Queue)
+SyncManager.register('Event', threading.Event, EventProxy)
+SyncManager.register('Lock', threading.Lock, AcquirerProxy)
+SyncManager.register('RLock', threading.RLock, AcquirerProxy)
+SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
+SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
+ AcquirerProxy)
+SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
+SyncManager.register('Pool', pool.Pool, PoolProxy)
+SyncManager.register('list', list, ListProxy)
+SyncManager.register('dict', dict, DictProxy)
+SyncManager.register('Value', Value, ValueProxy)
+SyncManager.register('Array', Array, ArrayProxy)
+SyncManager.register('Namespace', Namespace, NamespaceProxy)
+
+# types returned by methods of PoolProxy
+SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
+SyncManager.register('AsyncResult', create_method=False)
+
+#
+# Definition of SharedMemoryManager and SharedMemoryServer
+#
+
+if HAS_SHMEM:
+ class _SharedMemoryTracker:
+ "Manages one or more shared memory segments."
+
+ def __init__(self, name, segment_names=[]):
+ self.shared_memory_context_name = name
+ self.segment_names = segment_names
+
+ def register_segment(self, segment_name):
+ "Adds the supplied shared memory block name to tracker."
+ util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.append(segment_name)
+
+ def destroy_segment(self, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the list of blocks being tracked."""
+ util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.remove(segment_name)
+ segment = shared_memory.SharedMemory(segment_name)
+ segment.close()
+ segment.unlink()
+
+ def unlink(self):
+ "Calls destroy_segment() on all tracked shared memory blocks."
+ for segment_name in self.segment_names[:]:
+ self.destroy_segment(segment_name)
+
+ def __del__(self):
+ util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
+ self.unlink()
+
+ def __getstate__(self):
+ return (self.shared_memory_context_name, self.segment_names)
+
+ def __setstate__(self, state):
+ self.__init__(*state)
+
+
+ class SharedMemoryServer(Server):
+
+ public = Server.public + \
+ ['track_segment', 'release_segment', 'list_segments']
+
+ def __init__(self, *args, **kwargs):
+ Server.__init__(self, *args, **kwargs)
+ address = self.address
+ # The address of Linux abstract namespaces can be bytes
+ if isinstance(address, bytes):
+ address = os.fsdecode(address)
+ self.shared_memory_context = \
+ _SharedMemoryTracker(f"shm_{address}_{getpid()}")
+ util.debug(f"SharedMemoryServer started by pid {getpid()}")
+
+ def create(self, c, typeid, /, *args, **kwargs):
+ """Create a new distributed-shared object (not backed by a shared
+ memory block) and return its id to be used in a Proxy Object."""
+ # Unless set up as a shared proxy, don't make shared_memory_context
+ # a standard part of kwargs. This makes things easier for supplying
+ # simple functions.
+ if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
+ kwargs['shared_memory_context'] = self.shared_memory_context
+ return Server.create(self, c, typeid, *args, **kwargs)
+
+ def shutdown(self, c):
+ "Call unlink() on all tracked shared memory, terminate the Server."
+ self.shared_memory_context.unlink()
+ return Server.shutdown(self, c)
+
+ def track_segment(self, c, segment_name):
+ "Adds the supplied shared memory block name to Server's tracker."
+ self.shared_memory_context.register_segment(segment_name)
+
+ def release_segment(self, c, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the tracker instance inside the Server."""
+ self.shared_memory_context.destroy_segment(segment_name)
+
+ def list_segments(self, c):
+ """Returns a list of names of shared memory blocks that the Server
+ is currently tracking."""
+ return self.shared_memory_context.segment_names
+
+
+ class SharedMemoryManager(BaseManager):
+ """Like SyncManager but uses SharedMemoryServer instead of Server.
+
+ It provides methods for creating and returning SharedMemory instances
+ and for creating a list-like object (ShareableList) backed by shared
+ memory. It also provides methods that create and return Proxy Objects
+ that support synchronization across processes (i.e. multi-process-safe
+ locks and semaphores).
+ """
+
+ _Server = SharedMemoryServer
+
+ def __init__(self, *args, **kwargs):
+ if os.name == "posix":
+ # bpo-36867: Ensure the resource_tracker is running before
+ # launching the manager process, so that concurrent
+ # shared_memory manipulation both in the manager and in the
+ # current process does not create two resource_tracker
+ # processes.
+ from . import resource_tracker
+ resource_tracker.ensure_running()
+ BaseManager.__init__(self, *args, **kwargs)
+ util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
+
+ def __del__(self):
+ util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
+
+ def get_server(self):
+ 'Better than monkeypatching for now; merge into Server ultimately'
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started SharedMemoryServer")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("SharedMemoryManager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+ return self._Server(self._registry, self._address,
+ self._authkey, self._serializer)
+
+ def SharedMemory(self, size):
+ """Returns a new SharedMemory instance with the specified size in
+ bytes, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sms = shared_memory.SharedMemory(None, create=True, size=size)
+ try:
+ dispatch(conn, None, 'track_segment', (sms.name,))
+ except BaseException as e:
+ sms.unlink()
+ raise e
+ return sms
+
+ def ShareableList(self, sequence):
+ """Returns a new ShareableList instance populated with the values
+ from the input sequence, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sl = shared_memory.ShareableList(sequence)
+ try:
+ dispatch(conn, None, 'track_segment', (sl.shm.name,))
+ except BaseException as e:
+ sl.shm.unlink()
+ raise e
+ return sl
diff --git a/contrib/tools/python3/Lib/multiprocessing/pool.py b/contrib/tools/python3/Lib/multiprocessing/pool.py
new file mode 100644
index 0000000000..4f5d88cb97
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/pool.py
@@ -0,0 +1,957 @@
+#
+# Module providing the `Pool` class for managing a process pool
+#
+# multiprocessing/pool.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = ['Pool', 'ThreadPool']
+
+#
+# Imports
+#
+
+import collections
+import itertools
+import os
+import queue
+import threading
+import time
+import traceback
+import types
+import warnings
+
+# If threading is available then ThreadPool should be provided. Therefore
+# we avoid top-level imports which are liable to fail on some systems.
+from . import util
+from . import get_context, TimeoutError
+from .connection import wait
+
+#
+# Constants representing the state of a pool
+#
+
+INIT = "INIT"
+RUN = "RUN"
+CLOSE = "CLOSE"
+TERMINATE = "TERMINATE"
+
+#
+# Miscellaneous
+#
+
+job_counter = itertools.count()
+
+def mapstar(args):
+ return list(map(*args))
+
+def starmapstar(args):
+ return list(itertools.starmap(args[0], args[1]))
+
+#
+# Hack to embed stringification of remote traceback in local traceback
+#
+
+class RemoteTraceback(Exception):
+ def __init__(self, tb):
+ self.tb = tb
+ def __str__(self):
+ return self.tb
+
+class ExceptionWithTraceback:
+ def __init__(self, exc, tb):
+ tb = traceback.format_exception(type(exc), exc, tb)
+ tb = ''.join(tb)
+ self.exc = exc
+ self.tb = '\n"""\n%s"""' % tb
+ def __reduce__(self):
+ return rebuild_exc, (self.exc, self.tb)
+
+def rebuild_exc(exc, tb):
+ exc.__cause__ = RemoteTraceback(tb)
+ return exc
+
+#
+# Code run by worker processes
+#
+
+class MaybeEncodingError(Exception):
+ """Wraps possible unpickleable errors, so they can be
+ safely sent through the socket."""
+
+ def __init__(self, exc, value):
+ self.exc = repr(exc)
+ self.value = repr(value)
+ super(MaybeEncodingError, self).__init__(self.exc, self.value)
+
+ def __str__(self):
+ return "Error sending result: '%s'. Reason: '%s'" % (self.value,
+ self.exc)
+
+ def __repr__(self):
+ return "<%s: %s>" % (self.__class__.__name__, self)
+
+
+def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
+ wrap_exception=False):
+ if (maxtasks is not None) and not (isinstance(maxtasks, int)
+ and maxtasks >= 1):
+ raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
+ put = outqueue.put
+ get = inqueue.get
+ if hasattr(inqueue, '_writer'):
+ inqueue._writer.close()
+ outqueue._reader.close()
+
+ if initializer is not None:
+ initializer(*initargs)
+
+ completed = 0
+ while maxtasks is None or (maxtasks and completed < maxtasks):
+ try:
+ task = get()
+ except (EOFError, OSError):
+ util.debug('worker got EOFError or OSError -- exiting')
+ break
+
+ if task is None:
+ util.debug('worker got sentinel -- exiting')
+ break
+
+ job, i, func, args, kwds = task
+ try:
+ result = (True, func(*args, **kwds))
+ except Exception as e:
+ if wrap_exception and func is not _helper_reraises_exception:
+ e = ExceptionWithTraceback(e, e.__traceback__)
+ result = (False, e)
+ try:
+ put((job, i, result))
+ except Exception as e:
+ wrapped = MaybeEncodingError(e, result[1])
+ util.debug("Possible encoding error while sending result: %s" % (
+ wrapped))
+ put((job, i, (False, wrapped)))
+
+ task = job = result = func = args = kwds = None
+ completed += 1
+ util.debug('worker exiting after %d tasks' % completed)
+
+def _helper_reraises_exception(ex):
+ 'Pickle-able helper function for use by _guarded_task_generation.'
+ raise ex
+
+#
+# Class representing a process pool
+#
+
+class _PoolCache(dict):
+ """
+ Class that implements a cache for the Pool class that will notify
+ the pool management threads every time the cache is emptied. The
+ notification is done by the use of a queue that is provided when
+ instantiating the cache.
+ """
+ def __init__(self, /, *args, notifier=None, **kwds):
+ self.notifier = notifier
+ super().__init__(*args, **kwds)
+
+ def __delitem__(self, item):
+ super().__delitem__(item)
+
+ # Notify that the cache is empty. This is important because the
+ # pool keeps maintaining workers until the cache gets drained. This
+ # eliminates a race condition in which a task is finished after the
+ # the pool's _handle_workers method has enter another iteration of the
+ # loop. In this situation, the only event that can wake up the pool
+ # is the cache to be emptied (no more tasks available).
+ if not self:
+ self.notifier.put(None)
+
+class Pool(object):
+ '''
+ Class which supports an async version of applying functions to arguments.
+ '''
+ _wrap_exception = True
+
+ @staticmethod
+ def Process(ctx, *args, **kwds):
+ return ctx.Process(*args, **kwds)
+
+ def __init__(self, processes=None, initializer=None, initargs=(),
+ maxtasksperchild=None, context=None):
+ # Attributes initialized early to make sure that they exist in
+ # __del__() if __init__() raises an exception
+ self._pool = []
+ self._state = INIT
+
+ self._ctx = context or get_context()
+ self._setup_queues()
+ self._taskqueue = queue.SimpleQueue()
+ # The _change_notifier queue exist to wake up self._handle_workers()
+ # when the cache (self._cache) is empty or when there is a change in
+ # the _state variable of the thread that runs _handle_workers.
+ self._change_notifier = self._ctx.SimpleQueue()
+ self._cache = _PoolCache(notifier=self._change_notifier)
+ self._maxtasksperchild = maxtasksperchild
+ self._initializer = initializer
+ self._initargs = initargs
+
+ if processes is None:
+ processes = os.cpu_count() or 1
+ if processes < 1:
+ raise ValueError("Number of processes must be at least 1")
+ if maxtasksperchild is not None:
+ if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
+ raise ValueError("maxtasksperchild must be a positive int or None")
+
+ if initializer is not None and not callable(initializer):
+ raise TypeError('initializer must be a callable')
+
+ self._processes = processes
+ try:
+ self._repopulate_pool()
+ except Exception:
+ for p in self._pool:
+ if p.exitcode is None:
+ p.terminate()
+ for p in self._pool:
+ p.join()
+ raise
+
+ sentinels = self._get_sentinels()
+
+ self._worker_handler = threading.Thread(
+ target=Pool._handle_workers,
+ args=(self._cache, self._taskqueue, self._ctx, self.Process,
+ self._processes, self._pool, self._inqueue, self._outqueue,
+ self._initializer, self._initargs, self._maxtasksperchild,
+ self._wrap_exception, sentinels, self._change_notifier)
+ )
+ self._worker_handler.daemon = True
+ self._worker_handler._state = RUN
+ self._worker_handler.start()
+
+
+ self._task_handler = threading.Thread(
+ target=Pool._handle_tasks,
+ args=(self._taskqueue, self._quick_put, self._outqueue,
+ self._pool, self._cache)
+ )
+ self._task_handler.daemon = True
+ self._task_handler._state = RUN
+ self._task_handler.start()
+
+ self._result_handler = threading.Thread(
+ target=Pool._handle_results,
+ args=(self._outqueue, self._quick_get, self._cache)
+ )
+ self._result_handler.daemon = True
+ self._result_handler._state = RUN
+ self._result_handler.start()
+
+ self._terminate = util.Finalize(
+ self, self._terminate_pool,
+ args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
+ self._change_notifier, self._worker_handler, self._task_handler,
+ self._result_handler, self._cache),
+ exitpriority=15
+ )
+ self._state = RUN
+
+ # Copy globals as function locals to make sure that they are available
+ # during Python shutdown when the Pool is destroyed.
+ def __del__(self, _warn=warnings.warn, RUN=RUN):
+ if self._state == RUN:
+ _warn(f"unclosed running multiprocessing pool {self!r}",
+ ResourceWarning, source=self)
+ if getattr(self, '_change_notifier', None) is not None:
+ self._change_notifier.put(None)
+
+ def __repr__(self):
+ cls = self.__class__
+ return (f'<{cls.__module__}.{cls.__qualname__} '
+ f'state={self._state} '
+ f'pool_size={len(self._pool)}>')
+
+ def _get_sentinels(self):
+ task_queue_sentinels = [self._outqueue._reader]
+ self_notifier_sentinels = [self._change_notifier._reader]
+ return [*task_queue_sentinels, *self_notifier_sentinels]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return [worker.sentinel for worker in
+ workers if hasattr(worker, "sentinel")]
+
+ @staticmethod
+ def _join_exited_workers(pool):
+ """Cleanup after any worker processes which have exited due to reaching
+ their specified lifetime. Returns True if any workers were cleaned up.
+ """
+ cleaned = False
+ for i in reversed(range(len(pool))):
+ worker = pool[i]
+ if worker.exitcode is not None:
+ # worker exited
+ util.debug('cleaning up worker %d' % i)
+ worker.join()
+ cleaned = True
+ del pool[i]
+ return cleaned
+
+ def _repopulate_pool(self):
+ return self._repopulate_pool_static(self._ctx, self.Process,
+ self._processes,
+ self._pool, self._inqueue,
+ self._outqueue, self._initializer,
+ self._initargs,
+ self._maxtasksperchild,
+ self._wrap_exception)
+
+ @staticmethod
+ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
+ """Bring the number of pool processes up to the specified number,
+ for use after reaping workers which have exited.
+ """
+ for i in range(processes - len(pool)):
+ w = Process(ctx, target=worker,
+ args=(inqueue, outqueue,
+ initializer,
+ initargs, maxtasksperchild,
+ wrap_exception))
+ w.name = w.name.replace('Process', 'PoolWorker')
+ w.daemon = True
+ w.start()
+ pool.append(w)
+ util.debug('added worker')
+
+ @staticmethod
+ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild,
+ wrap_exception):
+ """Clean up any exited workers and start replacements for them.
+ """
+ if Pool._join_exited_workers(pool):
+ Pool._repopulate_pool_static(ctx, Process, processes, pool,
+ inqueue, outqueue, initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
+
+ def _setup_queues(self):
+ self._inqueue = self._ctx.SimpleQueue()
+ self._outqueue = self._ctx.SimpleQueue()
+ self._quick_put = self._inqueue._writer.send
+ self._quick_get = self._outqueue._reader.recv
+
+ def _check_running(self):
+ if self._state != RUN:
+ raise ValueError("Pool not running")
+
+ def apply(self, func, args=(), kwds={}):
+ '''
+ Equivalent of `func(*args, **kwds)`.
+ Pool must be running.
+ '''
+ return self.apply_async(func, args, kwds).get()
+
+ def map(self, func, iterable, chunksize=None):
+ '''
+ Apply `func` to each element in `iterable`, collecting the results
+ in a list that is returned.
+ '''
+ return self._map_async(func, iterable, mapstar, chunksize).get()
+
+ def starmap(self, func, iterable, chunksize=None):
+ '''
+ Like `map()` method but the elements of the `iterable` are expected to
+ be iterables as well and will be unpacked as arguments. Hence
+ `func` and (a, b) becomes func(a, b).
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize).get()
+
+ def starmap_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `starmap()` method.
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize,
+ callback, error_callback)
+
+ def _guarded_task_generation(self, result_job, func, iterable):
+ '''Provides a generator of tasks for imap and imap_unordered with
+ appropriate handling for iterables which throw exceptions during
+ iteration.'''
+ try:
+ i = -1
+ for i, x in enumerate(iterable):
+ yield (result_job, i, func, (x,), {})
+ except Exception as e:
+ yield (result_job, i+1, _helper_reraises_exception, (e,), {})
+
+ def imap(self, func, iterable, chunksize=1):
+ '''
+ Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
+ '''
+ self._check_running()
+ if chunksize == 1:
+ result = IMapIterator(self)
+ self._taskqueue.put(
+ (
+ self._guarded_task_generation(result._job, func, iterable),
+ result._set_length
+ ))
+ return result
+ else:
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0:n}".format(
+ chunksize))
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = IMapIterator(self)
+ self._taskqueue.put(
+ (
+ self._guarded_task_generation(result._job,
+ mapstar,
+ task_batches),
+ result._set_length
+ ))
+ return (item for chunk in result for item in chunk)
+
+ def imap_unordered(self, func, iterable, chunksize=1):
+ '''
+ Like `imap()` method but ordering of results is arbitrary.
+ '''
+ self._check_running()
+ if chunksize == 1:
+ result = IMapUnorderedIterator(self)
+ self._taskqueue.put(
+ (
+ self._guarded_task_generation(result._job, func, iterable),
+ result._set_length
+ ))
+ return result
+ else:
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0!r}".format(chunksize))
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = IMapUnorderedIterator(self)
+ self._taskqueue.put(
+ (
+ self._guarded_task_generation(result._job,
+ mapstar,
+ task_batches),
+ result._set_length
+ ))
+ return (item for chunk in result for item in chunk)
+
+ def apply_async(self, func, args=(), kwds={}, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `apply()` method.
+ '''
+ self._check_running()
+ result = ApplyResult(self, callback, error_callback)
+ self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
+ return result
+
+ def map_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `map()` method.
+ '''
+ return self._map_async(func, iterable, mapstar, chunksize, callback,
+ error_callback)
+
+ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Helper function to implement map, starmap and their async counterparts.
+ '''
+ self._check_running()
+ if not hasattr(iterable, '__len__'):
+ iterable = list(iterable)
+
+ if chunksize is None:
+ chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
+ if extra:
+ chunksize += 1
+ if len(iterable) == 0:
+ chunksize = 0
+
+ task_batches = Pool._get_tasks(func, iterable, chunksize)
+ result = MapResult(self, chunksize, len(iterable), callback,
+ error_callback=error_callback)
+ self._taskqueue.put(
+ (
+ self._guarded_task_generation(result._job,
+ mapper,
+ task_batches),
+ None
+ )
+ )
+ return result
+
+ @staticmethod
+ def _wait_for_updates(sentinels, change_notifier, timeout=None):
+ wait(sentinels, timeout=timeout)
+ while not change_notifier.empty():
+ change_notifier.get()
+
+ @classmethod
+ def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
+ pool, inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception, sentinels,
+ change_notifier):
+ thread = threading.current_thread()
+
+ # Keep maintaining workers until the cache gets drained, unless the pool
+ # is terminated.
+ while thread._state == RUN or (cache and thread._state != TERMINATE):
+ cls._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
+
+ current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
+
+ cls._wait_for_updates(current_sentinels, change_notifier)
+ # send sentinel to stop workers
+ taskqueue.put(None)
+ util.debug('worker handler exiting')
+
+ @staticmethod
+ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
+ thread = threading.current_thread()
+
+ for taskseq, set_length in iter(taskqueue.get, None):
+ task = None
+ try:
+ # iterating taskseq cannot fail
+ for task in taskseq:
+ if thread._state != RUN:
+ util.debug('task handler found thread._state != RUN')
+ break
+ try:
+ put(task)
+ except Exception as e:
+ job, idx = task[:2]
+ try:
+ cache[job]._set(idx, (False, e))
+ except KeyError:
+ pass
+ else:
+ if set_length:
+ util.debug('doing set_length()')
+ idx = task[1] if task else -1
+ set_length(idx + 1)
+ continue
+ break
+ finally:
+ task = taskseq = job = None
+ else:
+ util.debug('task handler got sentinel')
+
+ try:
+ # tell result handler to finish when cache is empty
+ util.debug('task handler sending sentinel to result handler')
+ outqueue.put(None)
+
+ # tell workers there is no more work
+ util.debug('task handler sending sentinel to workers')
+ for p in pool:
+ put(None)
+ except OSError:
+ util.debug('task handler got OSError when sending sentinels')
+
+ util.debug('task handler exiting')
+
+ @staticmethod
+ def _handle_results(outqueue, get, cache):
+ thread = threading.current_thread()
+
+ while 1:
+ try:
+ task = get()
+ except (OSError, EOFError):
+ util.debug('result handler got EOFError/OSError -- exiting')
+ return
+
+ if thread._state != RUN:
+ assert thread._state == TERMINATE, "Thread not in TERMINATE"
+ util.debug('result handler found thread._state=TERMINATE')
+ break
+
+ if task is None:
+ util.debug('result handler got sentinel')
+ break
+
+ job, i, obj = task
+ try:
+ cache[job]._set(i, obj)
+ except KeyError:
+ pass
+ task = job = obj = None
+
+ while cache and thread._state != TERMINATE:
+ try:
+ task = get()
+ except (OSError, EOFError):
+ util.debug('result handler got EOFError/OSError -- exiting')
+ return
+
+ if task is None:
+ util.debug('result handler ignoring extra sentinel')
+ continue
+ job, i, obj = task
+ try:
+ cache[job]._set(i, obj)
+ except KeyError:
+ pass
+ task = job = obj = None
+
+ if hasattr(outqueue, '_reader'):
+ util.debug('ensuring that outqueue is not full')
+ # If we don't make room available in outqueue then
+ # attempts to add the sentinel (None) to outqueue may
+ # block. There is guaranteed to be no more than 2 sentinels.
+ try:
+ for i in range(10):
+ if not outqueue._reader.poll():
+ break
+ get()
+ except (OSError, EOFError):
+ pass
+
+ util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
+ len(cache), thread._state)
+
+ @staticmethod
+ def _get_tasks(func, it, size):
+ it = iter(it)
+ while 1:
+ x = tuple(itertools.islice(it, size))
+ if not x:
+ return
+ yield (func, x)
+
+ def __reduce__(self):
+ raise NotImplementedError(
+ 'pool objects cannot be passed between processes or pickled'
+ )
+
+ def close(self):
+ util.debug('closing pool')
+ if self._state == RUN:
+ self._state = CLOSE
+ self._worker_handler._state = CLOSE
+ self._change_notifier.put(None)
+
+ def terminate(self):
+ util.debug('terminating pool')
+ self._state = TERMINATE
+ self._terminate()
+
+ def join(self):
+ util.debug('joining pool')
+ if self._state == RUN:
+ raise ValueError("Pool is still running")
+ elif self._state not in (CLOSE, TERMINATE):
+ raise ValueError("In unknown state")
+ self._worker_handler.join()
+ self._task_handler.join()
+ self._result_handler.join()
+ for p in self._pool:
+ p.join()
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ # task_handler may be blocked trying to put items on inqueue
+ util.debug('removing tasks from inqueue until task handler finished')
+ inqueue._rlock.acquire()
+ while task_handler.is_alive() and inqueue._reader.poll():
+ inqueue._reader.recv()
+ time.sleep(0)
+
+ @classmethod
+ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
+ worker_handler, task_handler, result_handler, cache):
+ # this is guaranteed to only be called once
+ util.debug('finalizing pool')
+
+ # Notify that the worker_handler state has been changed so the
+ # _handle_workers loop can be unblocked (and exited) in order to
+ # send the finalization sentinel all the workers.
+ worker_handler._state = TERMINATE
+ change_notifier.put(None)
+
+ task_handler._state = TERMINATE
+
+ util.debug('helping task handler/workers to finish')
+ cls._help_stuff_finish(inqueue, task_handler, len(pool))
+
+ if (not result_handler.is_alive()) and (len(cache) != 0):
+ raise AssertionError(
+ "Cannot have cache with result_handler not alive")
+
+ result_handler._state = TERMINATE
+ change_notifier.put(None)
+ outqueue.put(None) # sentinel
+
+ # We must wait for the worker handler to exit before terminating
+ # workers because we don't want workers to be restarted behind our back.
+ util.debug('joining worker handler')
+ if threading.current_thread() is not worker_handler:
+ worker_handler.join()
+
+ # Terminate workers which haven't already finished.
+ if pool and hasattr(pool[0], 'terminate'):
+ util.debug('terminating workers')
+ for p in pool:
+ if p.exitcode is None:
+ p.terminate()
+
+ util.debug('joining task handler')
+ if threading.current_thread() is not task_handler:
+ task_handler.join()
+
+ util.debug('joining result handler')
+ if threading.current_thread() is not result_handler:
+ result_handler.join()
+
+ if pool and hasattr(pool[0], 'terminate'):
+ util.debug('joining pool workers')
+ for p in pool:
+ if p.is_alive():
+ # worker has not yet exited
+ util.debug('cleaning up worker %d' % p.pid)
+ p.join()
+
+ def __enter__(self):
+ self._check_running()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.terminate()
+
+#
+# Class whose instances are returned by `Pool.apply_async()`
+#
+
+class ApplyResult(object):
+
+ def __init__(self, pool, callback, error_callback):
+ self._pool = pool
+ self._event = threading.Event()
+ self._job = next(job_counter)
+ self._cache = pool._cache
+ self._callback = callback
+ self._error_callback = error_callback
+ self._cache[self._job] = self
+
+ def ready(self):
+ return self._event.is_set()
+
+ def successful(self):
+ if not self.ready():
+ raise ValueError("{0!r} not ready".format(self))
+ return self._success
+
+ def wait(self, timeout=None):
+ self._event.wait(timeout)
+
+ def get(self, timeout=None):
+ self.wait(timeout)
+ if not self.ready():
+ raise TimeoutError
+ if self._success:
+ return self._value
+ else:
+ raise self._value
+
+ def _set(self, i, obj):
+ self._success, self._value = obj
+ if self._callback and self._success:
+ self._callback(self._value)
+ if self._error_callback and not self._success:
+ self._error_callback(self._value)
+ self._event.set()
+ del self._cache[self._job]
+ self._pool = None
+
+ __class_getitem__ = classmethod(types.GenericAlias)
+
+AsyncResult = ApplyResult # create alias -- see #17805
+
+#
+# Class whose instances are returned by `Pool.map_async()`
+#
+
+class MapResult(ApplyResult):
+
+ def __init__(self, pool, chunksize, length, callback, error_callback):
+ ApplyResult.__init__(self, pool, callback,
+ error_callback=error_callback)
+ self._success = True
+ self._value = [None] * length
+ self._chunksize = chunksize
+ if chunksize <= 0:
+ self._number_left = 0
+ self._event.set()
+ del self._cache[self._job]
+ else:
+ self._number_left = length//chunksize + bool(length % chunksize)
+
+ def _set(self, i, success_result):
+ self._number_left -= 1
+ success, result = success_result
+ if success and self._success:
+ self._value[i*self._chunksize:(i+1)*self._chunksize] = result
+ if self._number_left == 0:
+ if self._callback:
+ self._callback(self._value)
+ del self._cache[self._job]
+ self._event.set()
+ self._pool = None
+ else:
+ if not success and self._success:
+ # only store first exception
+ self._success = False
+ self._value = result
+ if self._number_left == 0:
+ # only consider the result ready once all jobs are done
+ if self._error_callback:
+ self._error_callback(self._value)
+ del self._cache[self._job]
+ self._event.set()
+ self._pool = None
+
+#
+# Class whose instances are returned by `Pool.imap()`
+#
+
+class IMapIterator(object):
+
+ def __init__(self, pool):
+ self._pool = pool
+ self._cond = threading.Condition(threading.Lock())
+ self._job = next(job_counter)
+ self._cache = pool._cache
+ self._items = collections.deque()
+ self._index = 0
+ self._length = None
+ self._unsorted = {}
+ self._cache[self._job] = self
+
+ def __iter__(self):
+ return self
+
+ def next(self, timeout=None):
+ with self._cond:
+ try:
+ item = self._items.popleft()
+ except IndexError:
+ if self._index == self._length:
+ self._pool = None
+ raise StopIteration from None
+ self._cond.wait(timeout)
+ try:
+ item = self._items.popleft()
+ except IndexError:
+ if self._index == self._length:
+ self._pool = None
+ raise StopIteration from None
+ raise TimeoutError from None
+
+ success, value = item
+ if success:
+ return value
+ raise value
+
+ __next__ = next # XXX
+
+ def _set(self, i, obj):
+ with self._cond:
+ if self._index == i:
+ self._items.append(obj)
+ self._index += 1
+ while self._index in self._unsorted:
+ obj = self._unsorted.pop(self._index)
+ self._items.append(obj)
+ self._index += 1
+ self._cond.notify()
+ else:
+ self._unsorted[i] = obj
+
+ if self._index == self._length:
+ del self._cache[self._job]
+ self._pool = None
+
+ def _set_length(self, length):
+ with self._cond:
+ self._length = length
+ if self._index == self._length:
+ self._cond.notify()
+ del self._cache[self._job]
+ self._pool = None
+
+#
+# Class whose instances are returned by `Pool.imap_unordered()`
+#
+
+class IMapUnorderedIterator(IMapIterator):
+
+ def _set(self, i, obj):
+ with self._cond:
+ self._items.append(obj)
+ self._index += 1
+ self._cond.notify()
+ if self._index == self._length:
+ del self._cache[self._job]
+ self._pool = None
+
+#
+#
+#
+
+class ThreadPool(Pool):
+ _wrap_exception = False
+
+ @staticmethod
+ def Process(ctx, *args, **kwds):
+ from .dummy import Process
+ return Process(*args, **kwds)
+
+ def __init__(self, processes=None, initializer=None, initargs=()):
+ Pool.__init__(self, processes, initializer, initargs)
+
+ def _setup_queues(self):
+ self._inqueue = queue.SimpleQueue()
+ self._outqueue = queue.SimpleQueue()
+ self._quick_put = self._inqueue.put
+ self._quick_get = self._outqueue.get
+
+ def _get_sentinels(self):
+ return [self._change_notifier._reader]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return []
+
+ @staticmethod
+ def _help_stuff_finish(inqueue, task_handler, size):
+ # drain inqueue, and put sentinels at its head to make workers finish
+ try:
+ while True:
+ inqueue.get(block=False)
+ except queue.Empty:
+ pass
+ for i in range(size):
+ inqueue.put(None)
+
+ def _wait_for_updates(self, sentinels, change_notifier, timeout):
+ time.sleep(timeout)
diff --git a/contrib/tools/python3/Lib/multiprocessing/popen_fork.py b/contrib/tools/python3/Lib/multiprocessing/popen_fork.py
new file mode 100644
index 0000000000..625981cf47
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/popen_fork.py
@@ -0,0 +1,83 @@
+import os
+import signal
+
+from . import util
+
+__all__ = ['Popen']
+
+#
+# Start child process using fork
+#
+
+class Popen(object):
+ method = 'fork'
+
+ def __init__(self, process_obj):
+ util._flush_std_streams()
+ self.returncode = None
+ self.finalizer = None
+ self._launch(process_obj)
+
+ def duplicate_for_child(self, fd):
+ return fd
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ try:
+ pid, sts = os.waitpid(self.pid, flag)
+ except OSError:
+ # Child process not yet created. See #1731717
+ # e.errno == errno.ECHILD == 10
+ return None
+ if pid == self.pid:
+ self.returncode = os.waitstatus_to_exitcode(sts)
+ return self.returncode
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is not None:
+ from multiprocessing.connection import wait
+ if not wait([self.sentinel], timeout):
+ return None
+ # This shouldn't block if wait() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
+
+ def _send_signal(self, sig):
+ if self.returncode is None:
+ try:
+ os.kill(self.pid, sig)
+ except ProcessLookupError:
+ pass
+ except OSError:
+ if self.wait(timeout=0.1) is None:
+ raise
+
+ def terminate(self):
+ self._send_signal(signal.SIGTERM)
+
+ def kill(self):
+ self._send_signal(signal.SIGKILL)
+
+ def _launch(self, process_obj):
+ code = 1
+ parent_r, child_w = os.pipe()
+ child_r, parent_w = os.pipe()
+ self.pid = os.fork()
+ if self.pid == 0:
+ try:
+ os.close(parent_r)
+ os.close(parent_w)
+ code = process_obj._bootstrap(parent_sentinel=child_r)
+ finally:
+ os._exit(code)
+ else:
+ os.close(child_w)
+ os.close(child_r)
+ self.finalizer = util.Finalize(self, util.close_fds,
+ (parent_r, parent_w,))
+ self.sentinel = parent_r
+
+ def close(self):
+ if self.finalizer is not None:
+ self.finalizer()
diff --git a/contrib/tools/python3/Lib/multiprocessing/popen_forkserver.py b/contrib/tools/python3/Lib/multiprocessing/popen_forkserver.py
new file mode 100644
index 0000000000..a56eb9bf11
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/popen_forkserver.py
@@ -0,0 +1,74 @@
+import io
+import os
+
+from .context import reduction, set_spawning_popen
+if not reduction.HAVE_SEND_HANDLE:
+ raise ImportError('No support for sending fds between processes')
+from . import forkserver
+from . import popen_fork
+from . import spawn
+from . import util
+
+
+__all__ = ['Popen']
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, ind):
+ self.ind = ind
+ def detach(self):
+ return forkserver.get_inherited_fds()[self.ind]
+
+#
+# Start child process using a server process
+#
+
+class Popen(popen_fork.Popen):
+ method = 'forkserver'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return len(self._fds) - 1
+
+ def _launch(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ buf = io.BytesIO()
+ set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, buf)
+ reduction.dump(process_obj, buf)
+ finally:
+ set_spawning_popen(None)
+
+ self.sentinel, w = forkserver.connect_to_new_process(self._fds)
+ # Keep a duplicate of the data pipe's write end as a sentinel of the
+ # parent process used by the child process.
+ _parent_w = os.dup(w)
+ self.finalizer = util.Finalize(self, util.close_fds,
+ (_parent_w, self.sentinel))
+ with open(w, 'wb', closefd=True) as f:
+ f.write(buf.getbuffer())
+ self.pid = forkserver.read_signed(self.sentinel)
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ from multiprocessing.connection import wait
+ timeout = 0 if flag == os.WNOHANG else None
+ if not wait([self.sentinel], timeout):
+ return None
+ try:
+ self.returncode = forkserver.read_signed(self.sentinel)
+ except (OSError, EOFError):
+ # This should not happen usually, but perhaps the forkserver
+ # process itself got killed
+ self.returncode = 255
+
+ return self.returncode
diff --git a/contrib/tools/python3/Lib/multiprocessing/popen_spawn_posix.py b/contrib/tools/python3/Lib/multiprocessing/popen_spawn_posix.py
new file mode 100644
index 0000000000..24b8634523
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/popen_spawn_posix.py
@@ -0,0 +1,72 @@
+import io
+import os
+
+from .context import reduction, set_spawning_popen
+from . import popen_fork
+from . import spawn
+from . import util
+
+__all__ = ['Popen']
+
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, fd):
+ self.fd = fd
+ def detach(self):
+ return self.fd
+
+#
+# Start child process using a fresh interpreter
+#
+
+class Popen(popen_fork.Popen):
+ method = 'spawn'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return fd
+
+ def _launch(self, process_obj):
+ from . import resource_tracker
+ tracker_fd = resource_tracker.getfd()
+ self._fds.append(tracker_fd)
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ fp = io.BytesIO()
+ set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, fp)
+ reduction.dump(process_obj, fp)
+ finally:
+ set_spawning_popen(None)
+
+ parent_r = child_w = child_r = parent_w = None
+ try:
+ parent_r, child_w = os.pipe()
+ child_r, parent_w = os.pipe()
+ cmd = spawn.get_command_line(tracker_fd=tracker_fd,
+ pipe_handle=child_r)
+ self._fds.extend([child_r, child_w])
+ self.pid = util.spawnv_passfds(spawn.get_executable(),
+ cmd, self._fds)
+ self.sentinel = parent_r
+ with open(parent_w, 'wb', closefd=False) as f:
+ f.write(fp.getbuffer())
+ finally:
+ fds_to_close = []
+ for fd in (parent_r, parent_w):
+ if fd is not None:
+ fds_to_close.append(fd)
+ self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)
+
+ for fd in (child_r, child_w):
+ if fd is not None:
+ os.close(fd)
diff --git a/contrib/tools/python3/Lib/multiprocessing/popen_spawn_win32.py b/contrib/tools/python3/Lib/multiprocessing/popen_spawn_win32.py
new file mode 100644
index 0000000000..2640086124
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/popen_spawn_win32.py
@@ -0,0 +1,146 @@
+import os
+import msvcrt
+import signal
+import sys
+import _winapi
+
+from .context import reduction, get_spawning_popen, set_spawning_popen
+from . import spawn
+from . import util
+
+__all__ = ['Popen']
+
+#
+#
+#
+
+# Exit code used by Popen.terminate()
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+
+def _path_eq(p1, p2):
+ return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2)
+
+WINENV = not _path_eq(sys.executable, sys._base_executable)
+
+
+def _close_handles(*handles):
+ for handle in handles:
+ _winapi.CloseHandle(handle)
+
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class Popen(object):
+ '''
+ Start a subprocess to run the code of a process object
+ '''
+ method = 'spawn'
+
+ def __init__(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+
+ # read end of pipe will be duplicated by the child process
+ # -- see spawn_main() in spawn.py.
+ #
+ # bpo-33929: Previously, the read end of pipe was "stolen" by the child
+ # process, but it leaked a handle if the child process had been
+ # terminated before it could steal the handle from the parent process.
+ rhandle, whandle = _winapi.CreatePipe(None, 0)
+ wfd = msvcrt.open_osfhandle(whandle, 0)
+ cmd = spawn.get_command_line(parent_pid=os.getpid(),
+ pipe_handle=rhandle)
+
+ python_exe = spawn.get_executable()
+
+ # bpo-35797: When running in a venv, we bypass the redirect
+ # executor and launch our base Python.
+ if WINENV and _path_eq(python_exe, sys.executable):
+ cmd[0] = python_exe = sys._base_executable
+ env = os.environ.copy()
+ env["__PYVENV_LAUNCHER__"] = sys.executable
+ else:
+ env = os.environ.copy()
+ env['Y_PYTHON_ENTRY_POINT'] = ':main'
+
+ cmd = ' '.join('"%s"' % x for x in cmd)
+
+ with open(wfd, 'wb', closefd=True) as to_child:
+ # start process
+ try:
+ hp, ht, pid, tid = _winapi.CreateProcess(
+ python_exe, cmd,
+ None, None, False, 0, env, None, None)
+ _winapi.CloseHandle(ht)
+ except:
+ _winapi.CloseHandle(rhandle)
+ raise
+
+ # set attributes of self
+ self.pid = pid
+ self.returncode = None
+ self._handle = hp
+ self.sentinel = int(hp)
+ self.finalizer = util.Finalize(self, _close_handles,
+ (self.sentinel, int(rhandle)))
+
+ # send information to child
+ set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, to_child)
+ reduction.dump(process_obj, to_child)
+ finally:
+ set_spawning_popen(None)
+
+ def duplicate_for_child(self, handle):
+ assert self is get_spawning_popen()
+ return reduction.duplicate(handle, self.sentinel)
+
+ def wait(self, timeout=None):
+ if self.returncode is not None:
+ return self.returncode
+
+ if timeout is None:
+ msecs = _winapi.INFINITE
+ else:
+ msecs = max(0, int(timeout * 1000 + 0.5))
+
+ res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+ if res == _winapi.WAIT_OBJECT_0:
+ code = _winapi.GetExitCodeProcess(self._handle)
+ if code == TERMINATE:
+ code = -signal.SIGTERM
+ self.returncode = code
+
+ return self.returncode
+
+ def poll(self):
+ return self.wait(timeout=0)
+
+ def terminate(self):
+ if self.returncode is not None:
+ return
+
+ try:
+ _winapi.TerminateProcess(int(self._handle), TERMINATE)
+ except PermissionError:
+ # ERROR_ACCESS_DENIED (winerror 5) is received when the
+ # process already died.
+ code = _winapi.GetExitCodeProcess(int(self._handle))
+ if code == _winapi.STILL_ACTIVE:
+ raise
+
+ # gh-113009: Don't set self.returncode. Even if GetExitCodeProcess()
+ # returns an exit code different than STILL_ACTIVE, the process can
+ # still be running. Only set self.returncode once WaitForSingleObject()
+ # returns WAIT_OBJECT_0 in wait().
+
+ kill = terminate
+
+ def close(self):
+ self.finalizer()
diff --git a/contrib/tools/python3/Lib/multiprocessing/process.py b/contrib/tools/python3/Lib/multiprocessing/process.py
new file mode 100644
index 0000000000..271ba3fd32
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/process.py
@@ -0,0 +1,439 @@
+#
+# Module providing the `Process` class which emulates `threading.Thread`
+#
+# multiprocessing/process.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = ['BaseProcess', 'current_process', 'active_children',
+ 'parent_process']
+
+#
+# Imports
+#
+
+import os
+import sys
+import signal
+import itertools
+import threading
+from _weakrefset import WeakSet
+
+#
+#
+#
+
+try:
+ ORIGINAL_DIR = os.path.abspath(os.getcwd())
+except OSError:
+ ORIGINAL_DIR = None
+
+#
+# Public functions
+#
+
+def current_process():
+ '''
+ Return process object representing the current process
+ '''
+ return _current_process
+
+def active_children():
+ '''
+ Return list of process objects corresponding to live child processes
+ '''
+ _cleanup()
+ return list(_children)
+
+
+def parent_process():
+ '''
+ Return process object representing the parent process
+ '''
+ return _parent_process
+
+#
+#
+#
+
+def _cleanup():
+ # check for processes which have finished
+ for p in list(_children):
+ if (child_popen := p._popen) and child_popen.poll() is not None:
+ _children.discard(p)
+
+#
+# The `Process` class
+#
+
+class BaseProcess(object):
+ '''
+ Process objects represent activity that is run in a separate process
+
+ The class is analogous to `threading.Thread`
+ '''
+ def _Popen(self):
+ raise NotImplementedError
+
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
+ *, daemon=None):
+ assert group is None, 'group argument must be None for now'
+ count = next(_process_counter)
+ self._identity = _current_process._identity + (count,)
+ self._config = _current_process._config.copy()
+ self._parent_pid = os.getpid()
+ self._parent_name = _current_process.name
+ self._popen = None
+ self._closed = False
+ self._target = target
+ self._args = tuple(args)
+ self._kwargs = dict(kwargs)
+ self._name = name or type(self).__name__ + '-' + \
+ ':'.join(str(i) for i in self._identity)
+ if daemon is not None:
+ self.daemon = daemon
+ _dangling.add(self)
+
+ def _check_closed(self):
+ if self._closed:
+ raise ValueError("process object is closed")
+
+ def run(self):
+ '''
+ Method to be run in sub-process; can be overridden in sub-class
+ '''
+ if self._target:
+ self._target(*self._args, **self._kwargs)
+
+ def start(self):
+ '''
+ Start child process
+ '''
+ self._check_closed()
+ assert self._popen is None, 'cannot start a process twice'
+ assert self._parent_pid == os.getpid(), \
+ 'can only start a process object created by current process'
+ assert not _current_process._config.get('daemon'), \
+ 'daemonic processes are not allowed to have children'
+ _cleanup()
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+ # Avoid a refcycle if the target function holds an indirect
+ # reference to the process object (see bpo-30775)
+ del self._target, self._args, self._kwargs
+ _children.add(self)
+
+ def terminate(self):
+ '''
+ Terminate process; sends SIGTERM signal or uses TerminateProcess()
+ '''
+ self._check_closed()
+ self._popen.terminate()
+
+ def kill(self):
+ '''
+ Terminate process; sends SIGKILL signal or uses TerminateProcess()
+ '''
+ self._check_closed()
+ self._popen.kill()
+
+ def join(self, timeout=None):
+ '''
+ Wait until child process terminates
+ '''
+ self._check_closed()
+ assert self._parent_pid == os.getpid(), 'can only join a child process'
+ assert self._popen is not None, 'can only join a started process'
+ res = self._popen.wait(timeout)
+ if res is not None:
+ _children.discard(self)
+
+ def is_alive(self):
+ '''
+ Return whether process is alive
+ '''
+ self._check_closed()
+ if self is _current_process:
+ return True
+ assert self._parent_pid == os.getpid(), 'can only test a child process'
+
+ if self._popen is None:
+ return False
+
+ returncode = self._popen.poll()
+ if returncode is None:
+ return True
+ else:
+ _children.discard(self)
+ return False
+
+ def close(self):
+ '''
+ Close the Process object.
+
+ This method releases resources held by the Process object. It is
+ an error to call this method if the child process is still running.
+ '''
+ if self._popen is not None:
+ if self._popen.poll() is None:
+ raise ValueError("Cannot close a process while it is still running. "
+ "You should first call join() or terminate().")
+ self._popen.close()
+ self._popen = None
+ del self._sentinel
+ _children.discard(self)
+ self._closed = True
+
+ @property
+ def name(self):
+ return self._name
+
+ @name.setter
+ def name(self, name):
+ assert isinstance(name, str), 'name must be a string'
+ self._name = name
+
+ @property
+ def daemon(self):
+ '''
+ Return whether process is a daemon
+ '''
+ return self._config.get('daemon', False)
+
+ @daemon.setter
+ def daemon(self, daemonic):
+ '''
+ Set whether process is a daemon
+ '''
+ assert self._popen is None, 'process has already started'
+ self._config['daemon'] = daemonic
+
+ @property
+ def authkey(self):
+ return self._config['authkey']
+
+ @authkey.setter
+ def authkey(self, authkey):
+ '''
+ Set authorization key of process
+ '''
+ self._config['authkey'] = AuthenticationString(authkey)
+
+ @property
+ def exitcode(self):
+ '''
+ Return exit code of process or `None` if it has yet to stop
+ '''
+ self._check_closed()
+ if self._popen is None:
+ return self._popen
+ return self._popen.poll()
+
+ @property
+ def ident(self):
+ '''
+ Return identifier (PID) of process or `None` if it has yet to start
+ '''
+ self._check_closed()
+ if self is _current_process:
+ return os.getpid()
+ else:
+ return self._popen and self._popen.pid
+
+ pid = ident
+
+ @property
+ def sentinel(self):
+ '''
+ Return a file descriptor (Unix) or handle (Windows) suitable for
+ waiting for process termination.
+ '''
+ self._check_closed()
+ try:
+ return self._sentinel
+ except AttributeError:
+ raise ValueError("process not started") from None
+
+ def __repr__(self):
+ exitcode = None
+ if self is _current_process:
+ status = 'started'
+ elif self._closed:
+ status = 'closed'
+ elif self._parent_pid != os.getpid():
+ status = 'unknown'
+ elif self._popen is None:
+ status = 'initial'
+ else:
+ exitcode = self._popen.poll()
+ if exitcode is not None:
+ status = 'stopped'
+ else:
+ status = 'started'
+
+ info = [type(self).__name__, 'name=%r' % self._name]
+ if self._popen is not None:
+ info.append('pid=%s' % self._popen.pid)
+ info.append('parent=%s' % self._parent_pid)
+ info.append(status)
+ if exitcode is not None:
+ exitcode = _exitcode_to_name.get(exitcode, exitcode)
+ info.append('exitcode=%s' % exitcode)
+ if self.daemon:
+ info.append('daemon')
+ return '<%s>' % ' '.join(info)
+
+ ##
+
+ def _bootstrap(self, parent_sentinel=None):
+ from . import util, context
+ global _current_process, _parent_process, _process_counter, _children
+
+ try:
+ if self._start_method is not None:
+ context._force_start_method(self._start_method)
+ _process_counter = itertools.count(1)
+ _children = set()
+ util._close_stdin()
+ old_process = _current_process
+ _current_process = self
+ _parent_process = _ParentProcess(
+ self._parent_name, self._parent_pid, parent_sentinel)
+ if threading._HAVE_THREAD_NATIVE_ID:
+ threading.main_thread()._set_native_id()
+ try:
+ self._after_fork()
+ finally:
+ # delay finalization of the old process object until after
+ # _run_after_forkers() is executed
+ del old_process
+ util.info('child process calling self.run()')
+ try:
+ self.run()
+ exitcode = 0
+ finally:
+ util._exit_function()
+ except SystemExit as e:
+ if e.code is None:
+ exitcode = 0
+ elif isinstance(e.code, int):
+ exitcode = e.code
+ else:
+ sys.stderr.write(str(e.code) + '\n')
+ exitcode = 1
+ except:
+ exitcode = 1
+ import traceback
+ sys.stderr.write('Process %s:\n' % self.name)
+ traceback.print_exc()
+ finally:
+ threading._shutdown()
+ util.info('process exiting with exitcode %d' % exitcode)
+ util._flush_std_streams()
+
+ return exitcode
+
+ @staticmethod
+ def _after_fork():
+ from . import util
+ util._finalizer_registry.clear()
+ util._run_after_forkers()
+
+
+#
+# We subclass bytes to avoid accidental transmission of auth keys over network
+#
+
+class AuthenticationString(bytes):
+ def __reduce__(self):
+ from .context import get_spawning_popen
+ if get_spawning_popen() is None:
+ raise TypeError(
+ 'Pickling an AuthenticationString object is '
+ 'disallowed for security reasons'
+ )
+ return AuthenticationString, (bytes(self),)
+
+
+#
+# Create object representing the parent process
+#
+
+class _ParentProcess(BaseProcess):
+
+ def __init__(self, name, pid, sentinel):
+ self._identity = ()
+ self._name = name
+ self._pid = pid
+ self._parent_pid = None
+ self._popen = None
+ self._closed = False
+ self._sentinel = sentinel
+ self._config = {}
+
+ def is_alive(self):
+ from multiprocessing.connection import wait
+ return not wait([self._sentinel], timeout=0)
+
+ @property
+ def ident(self):
+ return self._pid
+
+ def join(self, timeout=None):
+ '''
+ Wait until parent process terminates
+ '''
+ from multiprocessing.connection import wait
+ wait([self._sentinel], timeout=timeout)
+
+ pid = ident
+
+#
+# Create object representing the main process
+#
+
+class _MainProcess(BaseProcess):
+
+ def __init__(self):
+ self._identity = ()
+ self._name = 'MainProcess'
+ self._parent_pid = None
+ self._popen = None
+ self._closed = False
+ self._config = {'authkey': AuthenticationString(os.urandom(32)),
+ 'semprefix': '/mp'}
+ # Note that some versions of FreeBSD only allow named
+ # semaphores to have names of up to 14 characters. Therefore
+ # we choose a short prefix.
+ #
+ # On MacOSX in a sandbox it may be necessary to use a
+ # different prefix -- see #19478.
+ #
+ # Everything in self._config will be inherited by descendant
+ # processes.
+
+ def close(self):
+ pass
+
+
+_parent_process = None
+_current_process = _MainProcess()
+_process_counter = itertools.count(1)
+_children = set()
+del _MainProcess
+
+#
+# Give names to some return codes
+#
+
+_exitcode_to_name = {}
+
+for name, signum in list(signal.__dict__.items()):
+ if name[:3]=='SIG' and '_' not in name:
+ _exitcode_to_name[-signum] = f'-{name}'
+del name, signum
+
+# For debug and leak testing
+_dangling = WeakSet()
diff --git a/contrib/tools/python3/Lib/multiprocessing/queues.py b/contrib/tools/python3/Lib/multiprocessing/queues.py
new file mode 100644
index 0000000000..852ae87b27
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/queues.py
@@ -0,0 +1,401 @@
+#
+# Module implementing queues
+#
+# multiprocessing/queues.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
+
+import sys
+import os
+import threading
+import collections
+import time
+import types
+import weakref
+import errno
+
+from queue import Empty, Full
+
+import _multiprocessing
+
+from . import connection
+from . import context
+_ForkingPickler = context.reduction.ForkingPickler
+
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+
+#
+# Queue type using a pipe, buffer and thread
+#
+
+class Queue(object):
+
+ def __init__(self, maxsize=0, *, ctx):
+ if maxsize <= 0:
+ # Can raise ImportError (see issues #3770 and #23400)
+ from .synchronize import SEM_VALUE_MAX as maxsize
+ self._maxsize = maxsize
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = ctx.Lock()
+ self._opid = os.getpid()
+ if sys.platform == 'win32':
+ self._wlock = None
+ else:
+ self._wlock = ctx.Lock()
+ self._sem = ctx.BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
+ self._reset()
+
+ if sys.platform != 'win32':
+ register_after_fork(self, Queue._after_fork)
+
+ def __getstate__(self):
+ context.assert_spawning(self)
+ return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
+ self._rlock, self._wlock, self._sem, self._opid)
+
+ def __setstate__(self, state):
+ (self._ignore_epipe, self._maxsize, self._reader, self._writer,
+ self._rlock, self._wlock, self._sem, self._opid) = state
+ self._reset()
+
+ def _after_fork(self):
+ debug('Queue._after_fork()')
+ self._reset(after_fork=True)
+
+ def _reset(self, after_fork=False):
+ if after_fork:
+ self._notempty._at_fork_reinit()
+ else:
+ self._notempty = threading.Condition(threading.Lock())
+ self._buffer = collections.deque()
+ self._thread = None
+ self._jointhread = None
+ self._joincancelled = False
+ self._closed = False
+ self._close = None
+ self._send_bytes = self._writer.send_bytes
+ self._recv_bytes = self._reader.recv_bytes
+ self._poll = self._reader.poll
+
+ def put(self, obj, block=True, timeout=None):
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
+ if not self._sem.acquire(block, timeout):
+ raise Full
+
+ with self._notempty:
+ if self._thread is None:
+ self._start_thread()
+ self._buffer.append(obj)
+ self._notempty.notify()
+
+ def get(self, block=True, timeout=None):
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
+ if block and timeout is None:
+ with self._rlock:
+ res = self._recv_bytes()
+ self._sem.release()
+ else:
+ if block:
+ deadline = time.monotonic() + timeout
+ if not self._rlock.acquire(block, timeout):
+ raise Empty
+ try:
+ if block:
+ timeout = deadline - time.monotonic()
+ if not self._poll(timeout):
+ raise Empty
+ elif not self._poll():
+ raise Empty
+ res = self._recv_bytes()
+ self._sem.release()
+ finally:
+ self._rlock.release()
+ # unserialize the data after having released the lock
+ return _ForkingPickler.loads(res)
+
+ def qsize(self):
+ # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
+ return self._maxsize - self._sem._semlock._get_value()
+
+ def empty(self):
+ return not self._poll()
+
+ def full(self):
+ return self._sem._semlock._is_zero()
+
+ def get_nowait(self):
+ return self.get(False)
+
+ def put_nowait(self, obj):
+ return self.put(obj, False)
+
+ def close(self):
+ self._closed = True
+ close = self._close
+ if close:
+ self._close = None
+ close()
+
+ def join_thread(self):
+ debug('Queue.join_thread()')
+ assert self._closed, "Queue {0!r} not closed".format(self)
+ if self._jointhread:
+ self._jointhread()
+
+ def cancel_join_thread(self):
+ debug('Queue.cancel_join_thread()')
+ self._joincancelled = True
+ try:
+ self._jointhread.cancel()
+ except AttributeError:
+ pass
+
+ def _terminate_broken(self):
+ # Close a Queue on error.
+
+ # gh-94777: Prevent queue writing to a pipe which is no longer read.
+ self._reader.close()
+
+ # gh-107219: Close the connection writer which can unblock
+ # Queue._feed() if it was stuck in send_bytes().
+ if sys.platform == 'win32':
+ self._writer.close()
+
+ self.close()
+ self.join_thread()
+
+ def _start_thread(self):
+ debug('Queue._start_thread()')
+
+ # Start thread which transfers data from buffer to pipe
+ self._buffer.clear()
+ self._thread = threading.Thread(
+ target=Queue._feed,
+ args=(self._buffer, self._notempty, self._send_bytes,
+ self._wlock, self._reader.close, self._writer.close,
+ self._ignore_epipe, self._on_queue_feeder_error,
+ self._sem),
+ name='QueueFeederThread',
+ daemon=True,
+ )
+
+ try:
+ debug('doing self._thread.start()')
+ self._thread.start()
+ debug('... done self._thread.start()')
+ except:
+ # gh-109047: During Python finalization, creating a thread
+ # can fail with RuntimeError.
+ self._thread = None
+ raise
+
+ if not self._joincancelled:
+ self._jointhread = Finalize(
+ self._thread, Queue._finalize_join,
+ [weakref.ref(self._thread)],
+ exitpriority=-5
+ )
+
+ # Send sentinel to the thread queue object when garbage collected
+ self._close = Finalize(
+ self, Queue._finalize_close,
+ [self._buffer, self._notempty],
+ exitpriority=10
+ )
+
+ @staticmethod
+ def _finalize_join(twr):
+ debug('joining queue thread')
+ thread = twr()
+ if thread is not None:
+ thread.join()
+ debug('... queue thread joined')
+ else:
+ debug('... queue thread already dead')
+
+ @staticmethod
+ def _finalize_close(buffer, notempty):
+ debug('telling queue thread to quit')
+ with notempty:
+ buffer.append(_sentinel)
+ notempty.notify()
+
+ @staticmethod
+ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+ writer_close, ignore_epipe, onerror, queue_sem):
+ debug('starting thread to feed data to pipe')
+ nacquire = notempty.acquire
+ nrelease = notempty.release
+ nwait = notempty.wait
+ bpopleft = buffer.popleft
+ sentinel = _sentinel
+ if sys.platform != 'win32':
+ wacquire = writelock.acquire
+ wrelease = writelock.release
+ else:
+ wacquire = None
+
+ while 1:
+ try:
+ nacquire()
+ try:
+ if not buffer:
+ nwait()
+ finally:
+ nrelease()
+ try:
+ while 1:
+ obj = bpopleft()
+ if obj is sentinel:
+ debug('feeder thread got sentinel -- exiting')
+ reader_close()
+ writer_close()
+ return
+
+ # serialize the data before acquiring the lock
+ obj = _ForkingPickler.dumps(obj)
+ if wacquire is None:
+ send_bytes(obj)
+ else:
+ wacquire()
+ try:
+ send_bytes(obj)
+ finally:
+ wrelease()
+ except IndexError:
+ pass
+ except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
+ # Since this runs in a daemon thread the resources it uses
+ # may be become unusable while the process is cleaning up.
+ # We ignore errors which happen after the process has
+ # started to cleanup.
+ if is_exiting():
+ info('error in queue thread: %s', e)
+ return
+ else:
+ # Since the object has not been sent in the queue, we need
+ # to decrease the size of the queue. The error acts as
+ # if the object had been silently removed from the queue
+ # and this step is necessary to have a properly working
+ # queue.
+ queue_sem.release()
+ onerror(e, obj)
+
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ """
+ Private API hook called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+
+ __class_getitem__ = classmethod(types.GenericAlias)
+
+
+_sentinel = object()
+
+#
+# A queue type which also supports join() and task_done() methods
+#
+# Note that if you do not call task_done() for each finished task then
+# eventually the counter's semaphore may overflow causing Bad Things
+# to happen.
+#
+
+class JoinableQueue(Queue):
+
+ def __init__(self, maxsize=0, *, ctx):
+ Queue.__init__(self, maxsize, ctx=ctx)
+ self._unfinished_tasks = ctx.Semaphore(0)
+ self._cond = ctx.Condition()
+
+ def __getstate__(self):
+ return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
+
+ def __setstate__(self, state):
+ Queue.__setstate__(self, state[:-2])
+ self._cond, self._unfinished_tasks = state[-2:]
+
+ def put(self, obj, block=True, timeout=None):
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
+ if not self._sem.acquire(block, timeout):
+ raise Full
+
+ with self._notempty, self._cond:
+ if self._thread is None:
+ self._start_thread()
+ self._buffer.append(obj)
+ self._unfinished_tasks.release()
+ self._notempty.notify()
+
+ def task_done(self):
+ with self._cond:
+ if not self._unfinished_tasks.acquire(False):
+ raise ValueError('task_done() called too many times')
+ if self._unfinished_tasks._semlock._is_zero():
+ self._cond.notify_all()
+
+ def join(self):
+ with self._cond:
+ if not self._unfinished_tasks._semlock._is_zero():
+ self._cond.wait()
+
+#
+# Simplified Queue type -- really just a locked pipe
+#
+
+class SimpleQueue(object):
+
+ def __init__(self, *, ctx):
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = ctx.Lock()
+ self._poll = self._reader.poll
+ if sys.platform == 'win32':
+ self._wlock = None
+ else:
+ self._wlock = ctx.Lock()
+
+ def close(self):
+ self._reader.close()
+ self._writer.close()
+
+ def empty(self):
+ return not self._poll()
+
+ def __getstate__(self):
+ context.assert_spawning(self)
+ return (self._reader, self._writer, self._rlock, self._wlock)
+
+ def __setstate__(self, state):
+ (self._reader, self._writer, self._rlock, self._wlock) = state
+ self._poll = self._reader.poll
+
+ def get(self):
+ with self._rlock:
+ res = self._reader.recv_bytes()
+ # unserialize the data after having released the lock
+ return _ForkingPickler.loads(res)
+
+ def put(self, obj):
+ # serialize the data before acquiring the lock
+ obj = _ForkingPickler.dumps(obj)
+ if self._wlock is None:
+ # writes to a message oriented win32 pipe are atomic
+ self._writer.send_bytes(obj)
+ else:
+ with self._wlock:
+ self._writer.send_bytes(obj)
+
+ __class_getitem__ = classmethod(types.GenericAlias)
diff --git a/contrib/tools/python3/Lib/multiprocessing/reduction.py b/contrib/tools/python3/Lib/multiprocessing/reduction.py
new file mode 100644
index 0000000000..5593f0682f
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/reduction.py
@@ -0,0 +1,281 @@
+#
+# Module which deals with pickling of objects.
+#
+# multiprocessing/reduction.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+from abc import ABCMeta
+import copyreg
+import functools
+import io
+import os
+import pickle
+import socket
+import sys
+
+from . import context
+
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
+
+
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+ (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS') and
+ hasattr(socket.socket, 'sendmsg')))
+
+#
+# Pickler subclass
+#
+
+class ForkingPickler(pickle.Pickler):
+ '''Pickler subclass used by multiprocessing.'''
+ _extra_reducers = {}
+ _copyreg_dispatch_table = copyreg.dispatch_table
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.dispatch_table = self._copyreg_dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
+
+ @classmethod
+ def register(cls, type, reduce):
+ '''Register a reduce function for a type.'''
+ cls._extra_reducers[type] = reduce
+
+ @classmethod
+ def dumps(cls, obj, protocol=None):
+ buf = io.BytesIO()
+ cls(buf, protocol).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+ '''Replacement for pickle.dump() using ForkingPickler.'''
+ ForkingPickler(file, protocol).dump(obj)
+
+#
+# Platform specific definitions
+#
+
+if sys.platform == 'win32':
+ # Windows
+ __all__ += ['DupHandle', 'duplicate', 'steal_handle']
+ import _winapi
+
+ def duplicate(handle, target_process=None, inheritable=False,
+ *, source_process=None):
+ '''Duplicate a handle. (target_process is a handle not a pid!)'''
+ current_process = _winapi.GetCurrentProcess()
+ if source_process is None:
+ source_process = current_process
+ if target_process is None:
+ target_process = current_process
+ return _winapi.DuplicateHandle(
+ source_process, handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+ def steal_handle(source_pid, handle):
+ '''Steal a handle from process identified by source_pid.'''
+ source_process_handle = _winapi.OpenProcess(
+ _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+ try:
+ return _winapi.DuplicateHandle(
+ source_process_handle, handle,
+ _winapi.GetCurrentProcess(), 0, False,
+ _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(source_process_handle)
+
+ def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
+ dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
+ conn.send(dh)
+
+ def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
+ return conn.recv().detach()
+
+ class DupHandle(object):
+ '''Picklable wrapper for a handle.'''
+ def __init__(self, handle, access, pid=None):
+ if pid is None:
+ # We just duplicate the handle in the current process and
+ # let the receiving process steal the handle.
+ pid = os.getpid()
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
+ try:
+ self._handle = _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(),
+ handle, proc, access, False, 0)
+ finally:
+ _winapi.CloseHandle(proc)
+ self._access = access
+ self._pid = pid
+
+ def detach(self):
+ '''Get the handle. This should only be called once.'''
+ # retrieve handle from process which currently owns it
+ if self._pid == os.getpid():
+ # The handle has already been duplicated for this process.
+ return self._handle
+ # We must steal the handle from the process whose pid is self._pid.
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
+ self._pid)
+ try:
+ return _winapi.DuplicateHandle(
+ proc, self._handle, _winapi.GetCurrentProcess(),
+ self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(proc)
+
+else:
+ # Unix
+ __all__ += ['DupFd', 'sendfds', 'recvfds']
+ import array
+
+ # On MacOSX we should acknowledge receipt of fds -- see Issue14669
+ ACKNOWLEDGE = sys.platform == 'darwin'
+
+ def sendfds(sock, fds):
+ '''Send an array of fds over an AF_UNIX socket.'''
+ fds = array.array('i', fds)
+ msg = bytes([len(fds) % 256])
+ sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+ if ACKNOWLEDGE and sock.recv(1) != b'A':
+ raise RuntimeError('did not receive acknowledgement of fd')
+
+ def recvfds(sock, size):
+ '''Receive an array of fds over an AF_UNIX socket.'''
+ a = array.array('i')
+ bytes_size = a.itemsize * size
+ msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
+ if not msg and not ancdata:
+ raise EOFError
+ try:
+ if ACKNOWLEDGE:
+ sock.send(b'A')
+ if len(ancdata) != 1:
+ raise RuntimeError('received %d items of ancdata' %
+ len(ancdata))
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ if len(cmsg_data) % a.itemsize != 0:
+ raise ValueError
+ a.frombytes(cmsg_data)
+ if len(a) % 256 != msg[0]:
+ raise AssertionError(
+ "Len is {0:n} but msg[0] is {1!r}".format(
+ len(a), msg[0]))
+ return list(a)
+ except (ValueError, IndexError):
+ pass
+ raise RuntimeError('Invalid data received')
+
+ def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ sendfds(s, [handle])
+
+ def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ return recvfds(s, 1)[0]
+
+ def DupFd(fd):
+ '''Return a wrapper for an fd.'''
+ popen_obj = context.get_spawning_popen()
+ if popen_obj is not None:
+ return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+ elif HAVE_SEND_HANDLE:
+ from . import resource_sharer
+ return resource_sharer.DupFd(fd)
+ else:
+ raise ValueError('SCM_RIGHTS appears not to be available')
+
+#
+# Try making some callable types picklable
+#
+
+def _reduce_method(m):
+ if m.__self__ is None:
+ return getattr, (m.__class__, m.__func__.__name__)
+ else:
+ return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+ def f(self):
+ pass
+register(type(_C().f), _reduce_method)
+
+
+def _reduce_method_descriptor(m):
+ return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
+
+
+def _reduce_partial(p):
+ return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+ return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
+
+#
+# Make sockets picklable
+#
+
+if sys.platform == 'win32':
+ def _reduce_socket(s):
+ from .resource_sharer import DupSocket
+ return _rebuild_socket, (DupSocket(s),)
+ def _rebuild_socket(ds):
+ return ds.detach()
+ register(socket.socket, _reduce_socket)
+
+else:
+ def _reduce_socket(s):
+ df = DupFd(s.fileno())
+ return _rebuild_socket, (df, s.family, s.type, s.proto)
+ def _rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ return socket.socket(family, type, proto, fileno=fd)
+ register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+ '''Abstract base class for use in implementing a Reduction class
+ suitable for use in replacing the standard reduction mechanism
+ used in multiprocessing.'''
+ ForkingPickler = ForkingPickler
+ register = register
+ dump = dump
+ send_handle = send_handle
+ recv_handle = recv_handle
+
+ if sys.platform == 'win32':
+ steal_handle = steal_handle
+ duplicate = duplicate
+ DupHandle = DupHandle
+ else:
+ sendfds = sendfds
+ recvfds = recvfds
+ DupFd = DupFd
+
+ _reduce_method = _reduce_method
+ _reduce_method_descriptor = _reduce_method_descriptor
+ _rebuild_partial = _rebuild_partial
+ _reduce_socket = _reduce_socket
+ _rebuild_socket = _rebuild_socket
+
+ def __init__(self, *args):
+ register(type(_C().f), _reduce_method)
+ register(type(list.append), _reduce_method_descriptor)
+ register(type(int.__add__), _reduce_method_descriptor)
+ register(functools.partial, _reduce_partial)
+ register(socket.socket, _reduce_socket)
diff --git a/contrib/tools/python3/Lib/multiprocessing/resource_sharer.py b/contrib/tools/python3/Lib/multiprocessing/resource_sharer.py
new file mode 100644
index 0000000000..b8afb0fbed
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/resource_sharer.py
@@ -0,0 +1,154 @@
+#
+# We use a background thread for sharing fds on Unix, and for sharing sockets on
+# Windows.
+#
+# A client which wants to pickle a resource registers it with the resource
+# sharer and gets an identifier in return. The unpickling process will connect
+# to the resource sharer, sends the identifier and its pid, and then receives
+# the resource.
+#
+
+import os
+import signal
+import socket
+import sys
+import threading
+
+from . import process
+from .context import reduction
+from . import util
+
+__all__ = ['stop']
+
+
+if sys.platform == 'win32':
+ __all__ += ['DupSocket']
+
+ class DupSocket(object):
+ '''Picklable wrapper for a socket.'''
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = _resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ '''Get the socket. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+
+else:
+ __all__ += ['DupFd']
+
+ class DupFd(object):
+ '''Wrapper for fd which can be used at any time.'''
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ reduction.send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = _resource_sharer.register(send, close)
+
+ def detach(self):
+ '''Get the fd. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ return reduction.recv_handle(conn)
+
+
+class _ResourceSharer(object):
+ '''Manager for resources using background thread.'''
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ self._thread = None
+ util.register_after_fork(self, _ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ '''Register resource, returning an identifier.'''
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ '''Return connection from which to receive identified resource.'''
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=process.current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def stop(self, timeout=None):
+ '''Stop the background thread and clear registered resources.'''
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address,
+ authkey=process.current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ util.sub_warning('_ResourceSharer thread did '
+ 'not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ self._lock._at_fork_reinit()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+ self._thread = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None, "Already have Listener"
+ util.debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=process.current_process().authkey, backlog=128)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+ self._thread = t
+
+ def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
+ while 1:
+ try:
+ with self._listener.accept() as conn:
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
+ send, close = self._cache.pop(key)
+ try:
+ send(conn, destination_pid)
+ finally:
+ close()
+ except:
+ if not util.is_exiting():
+ sys.excepthook(*sys.exc_info())
+
+
+_resource_sharer = _ResourceSharer()
+stop = _resource_sharer.stop
diff --git a/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py b/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
new file mode 100644
index 0000000000..79e96ecf32
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
@@ -0,0 +1,269 @@
+###############################################################################
+# Server process to keep track of unlinked resources (like shared memory
+# segments, semaphores etc.) and clean them.
+#
+# On Unix we run a server process which keeps track of unlinked
+# resources. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining resource names.
+#
+# This is important because there may be system limits for such resources: for
+# instance, the system only supports a limited number of named semaphores, and
+# shared-memory segments live in the RAM. If a python process leaks such a
+# resource, this resource will not be removed till the next reboot. Without
+# this resource tracker process, "killall python" would probably leave unlinked
+# resources.
+
+import os
+import signal
+import sys
+import threading
+import warnings
+
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
+_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+
+_CLEANUP_FUNCS = {
+ 'noop': lambda: None,
+}
+
+if os.name == 'posix':
+ import _multiprocessing
+ import _posixshmem
+
+ # Use sem_unlink() to clean up named semaphores.
+ #
+ # sem_unlink() may be missing if the Python build process detected the
+ # absence of POSIX named semaphores. In that case, no named semaphores were
+ # ever opened, so no cleanup would be necessary.
+ if hasattr(_multiprocessing, 'sem_unlink'):
+ _CLEANUP_FUNCS.update({
+ 'semaphore': _multiprocessing.sem_unlink,
+ })
+ _CLEANUP_FUNCS.update({
+ 'shared_memory': _posixshmem.shm_unlink,
+ })
+
+
+class ReentrantCallError(RuntimeError):
+ pass
+
+
+class ResourceTracker(object):
+
+ def __init__(self):
+ self._lock = threading.RLock()
+ self._fd = None
+ self._pid = None
+
+ def _reentrant_call_error(self):
+ # gh-109629: this happens if an explicit call to the ResourceTracker
+ # gets interrupted by a garbage collection, invoking a finalizer (*)
+ # that itself calls back into ResourceTracker.
+ # (*) for example the SemLock finalizer
+ raise ReentrantCallError(
+ "Reentrant call into the multiprocessing resource tracker")
+
+ def _stop(self):
+ with self._lock:
+ # This should not happen (_stop() isn't called by a finalizer)
+ # but we check for it anyway.
+ if self._lock._recursion_count() > 1:
+ return self._reentrant_call_error()
+ if self._fd is None:
+ # not running
+ return
+
+ # closing the "alive" file descriptor stops main()
+ os.close(self._fd)
+ self._fd = None
+
+ os.waitpid(self._pid, 0)
+ self._pid = None
+
+ def getfd(self):
+ self.ensure_running()
+ return self._fd
+
+ def ensure_running(self):
+ '''Make sure that resource tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the resource created by its parent.'''
+ with self._lock:
+ if self._lock._recursion_count() > 1:
+ # The code below is certainly not reentrant-safe, so bail out
+ return self._reentrant_call_error()
+ if self._fd is not None:
+ # resource tracker was launched before, is it still running?
+ if self._check_alive():
+ # => still alive
+ return
+ # => dead, launch it again
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
+ try:
+ # _pid can be None if this process is a child from another
+ # python process, which has started the resource_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
+ except ChildProcessError:
+ # The resource_tracker has already been terminated.
+ pass
+ self._fd = None
+ self._pid = None
+
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
+
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ try:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ self._pid = pid
+ finally:
+ os.close(r)
+
+ def _check_alive(self):
+ '''Check that the pipe has not been closed by sending a probe.'''
+ try:
+ # We cannot use send here as it calls ensure_running, creating
+ # a cycle.
+ os.write(self._fd, b'PROBE:0:noop\n')
+ except OSError:
+ return False
+ else:
+ return True
+
+ def register(self, name, rtype):
+ '''Register name of resource with resource tracker.'''
+ self._send('REGISTER', name, rtype)
+
+ def unregister(self, name, rtype):
+ '''Unregister name of resource with resource tracker.'''
+ self._send('UNREGISTER', name, rtype)
+
+ def _send(self, cmd, name, rtype):
+ try:
+ self.ensure_running()
+ except ReentrantCallError:
+ # The code below might or might not work, depending on whether
+ # the resource tracker was already running and still alive.
+ # Better warn the user.
+ # (XXX is warnings.warn itself reentrant-safe? :-)
+ warnings.warn(
+ f"ResourceTracker called reentrantly for resource cleanup, "
+ f"which is unsupported. "
+ f"The {rtype} object {name!r} might leak.")
+ msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+ if len(msg) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('msg too long')
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
+ nbytes, len(msg))
+
+
+_resource_tracker = ResourceTracker()
+ensure_running = _resource_tracker.ensure_running
+register = _resource_tracker.register
+unregister = _resource_tracker.unregister
+getfd = _resource_tracker.getfd
+
+
+def main(fd):
+ '''Run resource tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
+ try:
+ # keep track of registered/unregistered resources
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name, rtype = line.strip().decode('ascii').split(':')
+ cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
+ if cleanup_func is None:
+ raise ValueError(
+ f'Cannot register {name} for automatic cleanup: '
+ f'unknown resource type {rtype}')
+
+ if cmd == 'REGISTER':
+ cache[rtype].add(name)
+ elif cmd == 'UNREGISTER':
+ cache[rtype].remove(name)
+ elif cmd == 'PROBE':
+ pass
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining resources
+ for rtype, rtype_cache in cache.items():
+ if rtype_cache:
+ try:
+ warnings.warn('resource_tracker: There appear to be %d '
+ 'leaked %s objects to clean up at shutdown' %
+ (len(rtype_cache), rtype))
+ except Exception:
+ pass
+ for name in rtype_cache:
+ # For some reason the process which created and registered this
+ # resource has failed to unregister it. Presumably it has
+ # died. We therefore unlink it.
+ try:
+ try:
+ _CLEANUP_FUNCS[rtype](name)
+ except Exception as e:
+ warnings.warn('resource_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/contrib/tools/python3/Lib/multiprocessing/shared_memory.py b/contrib/tools/python3/Lib/multiprocessing/shared_memory.py
new file mode 100644
index 0000000000..9a1e5aa17b
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/shared_memory.py
@@ -0,0 +1,534 @@
+"""Provides shared memory for direct access across processes.
+
+The API of this package is currently provisional. Refer to the
+documentation for details.
+"""
+
+
+__all__ = [ 'SharedMemory', 'ShareableList' ]
+
+
+from functools import partial
+import mmap
+import os
+import errno
+import struct
+import secrets
+import types
+
+if os.name == "nt":
+ import _winapi
+ _USE_POSIX = False
+else:
+ import _posixshmem
+ _USE_POSIX = True
+
+from . import resource_tracker
+
+_O_CREX = os.O_CREAT | os.O_EXCL
+
+# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
+_SHM_SAFE_NAME_LENGTH = 14
+
+# Shared memory block name prefix
+if _USE_POSIX:
+ _SHM_NAME_PREFIX = '/psm_'
+else:
+ _SHM_NAME_PREFIX = 'wnsm_'
+
+
+def _make_filename():
+ "Create a random filename for the shared memory object."
+ # number of random bytes to use for name
+ nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
+ assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
+ name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
+ assert len(name) <= _SHM_SAFE_NAME_LENGTH
+ return name
+
+
+class SharedMemory:
+ """Creates a new shared memory block or attaches to an existing
+ shared memory block.
+
+ Every shared memory block is assigned a unique name. This enables
+ one process to create a shared memory block with a particular name
+ so that a different process can attach to that same shared memory
+ block using that same name.
+
+ As a resource for sharing data across processes, shared memory blocks
+ may outlive the original process that created them. When one process
+ no longer needs access to a shared memory block that might still be
+ needed by other processes, the close() method should be called.
+ When a shared memory block is no longer needed by any process, the
+ unlink() method should be called to ensure proper cleanup."""
+
+ # Defaults; enables close() and unlink() to run without errors.
+ _name = None
+ _fd = -1
+ _mmap = None
+ _buf = None
+ _flags = os.O_RDWR
+ _mode = 0o600
+ _prepend_leading_slash = True if _USE_POSIX else False
+
+ def __init__(self, name=None, create=False, size=0):
+ if not size >= 0:
+ raise ValueError("'size' must be a positive integer")
+ if create:
+ self._flags = _O_CREX | os.O_RDWR
+ if size == 0:
+ raise ValueError("'size' must be a positive number different from zero")
+ if name is None and not self._flags & os.O_EXCL:
+ raise ValueError("'name' can only be None if create=True")
+
+ if _USE_POSIX:
+
+ # POSIX Shared Memory
+
+ if name is None:
+ while True:
+ name = _make_filename()
+ try:
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ except FileExistsError:
+ continue
+ self._name = name
+ break
+ else:
+ name = "/" + name if self._prepend_leading_slash else name
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ self._name = name
+ try:
+ if create and size:
+ os.ftruncate(self._fd, size)
+ stats = os.fstat(self._fd)
+ size = stats.st_size
+ self._mmap = mmap.mmap(self._fd, size)
+ except OSError:
+ self.unlink()
+ raise
+
+ resource_tracker.register(self._name, "shared_memory")
+
+ else:
+
+ # Windows Named Shared Memory
+
+ if create:
+ while True:
+ temp_name = _make_filename() if name is None else name
+ # Create and reserve shared memory block with this name
+ # until it can be attached to by mmap.
+ h_map = _winapi.CreateFileMapping(
+ _winapi.INVALID_HANDLE_VALUE,
+ _winapi.NULL,
+ _winapi.PAGE_READWRITE,
+ (size >> 32) & 0xFFFFFFFF,
+ size & 0xFFFFFFFF,
+ temp_name
+ )
+ try:
+ last_error_code = _winapi.GetLastError()
+ if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
+ if name is not None:
+ raise FileExistsError(
+ errno.EEXIST,
+ os.strerror(errno.EEXIST),
+ name,
+ _winapi.ERROR_ALREADY_EXISTS
+ )
+ else:
+ continue
+ self._mmap = mmap.mmap(-1, size, tagname=temp_name)
+ finally:
+ _winapi.CloseHandle(h_map)
+ self._name = temp_name
+ break
+
+ else:
+ self._name = name
+ # Dynamically determine the existing named shared memory
+ # block's size which is likely a multiple of mmap.PAGESIZE.
+ h_map = _winapi.OpenFileMapping(
+ _winapi.FILE_MAP_READ,
+ False,
+ name
+ )
+ try:
+ p_buf = _winapi.MapViewOfFile(
+ h_map,
+ _winapi.FILE_MAP_READ,
+ 0,
+ 0,
+ 0
+ )
+ finally:
+ _winapi.CloseHandle(h_map)
+ try:
+ size = _winapi.VirtualQuerySize(p_buf)
+ finally:
+ _winapi.UnmapViewOfFile(p_buf)
+ self._mmap = mmap.mmap(-1, size, tagname=name)
+
+ self._size = size
+ self._buf = memoryview(self._mmap)
+
+ def __del__(self):
+ try:
+ self.close()
+ except OSError:
+ pass
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.name,
+ False,
+ self.size,
+ ),
+ )
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
+
+ @property
+ def buf(self):
+ "A memoryview of contents of the shared memory block."
+ return self._buf
+
+ @property
+ def name(self):
+ "Unique name that identifies the shared memory block."
+ reported_name = self._name
+ if _USE_POSIX and self._prepend_leading_slash:
+ if self._name.startswith("/"):
+ reported_name = self._name[1:]
+ return reported_name
+
+ @property
+ def size(self):
+ "Size in bytes."
+ return self._size
+
+ def close(self):
+ """Closes access to the shared memory from this instance but does
+ not destroy the shared memory block."""
+ if self._buf is not None:
+ self._buf.release()
+ self._buf = None
+ if self._mmap is not None:
+ self._mmap.close()
+ self._mmap = None
+ if _USE_POSIX and self._fd >= 0:
+ os.close(self._fd)
+ self._fd = -1
+
+ def unlink(self):
+ """Requests that the underlying shared memory block be destroyed.
+
+ In order to ensure proper cleanup of resources, unlink should be
+ called once (and only once) across all processes which have access
+ to the shared memory block."""
+ if _USE_POSIX and self._name:
+ _posixshmem.shm_unlink(self._name)
+ resource_tracker.unregister(self._name, "shared_memory")
+
+
+_encoding = "utf8"
+
+class ShareableList:
+ """Pattern for a mutable list-like object shareable via a shared
+ memory block. It differs from the built-in list type in that these
+ lists can not change their overall length (i.e. no append, insert,
+ etc.)
+
+ Because values are packed into a memoryview as bytes, the struct
+ packing format for any storable value must require no more than 8
+ characters to describe its format."""
+
+ # The shared memory area is organized as follows:
+ # - 8 bytes: number of items (N) as a 64-bit integer
+ # - (N + 1) * 8 bytes: offsets of each element from the start of the
+ # data area
+ # - K bytes: the data area storing item values (with encoding and size
+ # depending on their respective types)
+ # - N * 8 bytes: `struct` format string for each element
+ # - N bytes: index into _back_transforms_mapping for each element
+ # (for reconstructing the corresponding Python value)
+ _types_mapping = {
+ int: "q",
+ float: "d",
+ bool: "xxxxxxx?",
+ str: "%ds",
+ bytes: "%ds",
+ None.__class__: "xxxxxx?x",
+ }
+ _alignment = 8
+ _back_transforms_mapping = {
+ 0: lambda value: value, # int, float, bool
+ 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
+ 2: lambda value: value.rstrip(b'\x00'), # bytes
+ 3: lambda _value: None, # None
+ }
+
+ @staticmethod
+ def _extract_recreation_code(value):
+ """Used in concert with _back_transforms_mapping to convert values
+ into the appropriate Python objects when retrieving them from
+ the list as well as when storing them."""
+ if not isinstance(value, (str, bytes, None.__class__)):
+ return 0
+ elif isinstance(value, str):
+ return 1
+ elif isinstance(value, bytes):
+ return 2
+ else:
+ return 3 # NoneType
+
+ def __init__(self, sequence=None, *, name=None):
+ if name is None or sequence is not None:
+ sequence = sequence or ()
+ _formats = [
+ self._types_mapping[type(item)]
+ if not isinstance(item, (str, bytes))
+ else self._types_mapping[type(item)] % (
+ self._alignment * (len(item) // self._alignment + 1),
+ )
+ for item in sequence
+ ]
+ self._list_len = len(_formats)
+ assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
+ offset = 0
+ # The offsets of each list element into the shared memory's
+ # data area (0 meaning the start of the data area, not the start
+ # of the shared memory area).
+ self._allocated_offsets = [0]
+ for fmt in _formats:
+ offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
+ self._allocated_offsets.append(offset)
+ _recreation_codes = [
+ self._extract_recreation_code(item) for item in sequence
+ ]
+ requested_size = struct.calcsize(
+ "q" + self._format_size_metainfo +
+ "".join(_formats) +
+ self._format_packing_metainfo +
+ self._format_back_transform_codes
+ )
+
+ self.shm = SharedMemory(name, create=True, size=requested_size)
+ else:
+ self.shm = SharedMemory(name)
+
+ if sequence is not None:
+ _enc = _encoding
+ struct.pack_into(
+ "q" + self._format_size_metainfo,
+ self.shm.buf,
+ 0,
+ self._list_len,
+ *(self._allocated_offsets)
+ )
+ struct.pack_into(
+ "".join(_formats),
+ self.shm.buf,
+ self._offset_data_start,
+ *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
+ )
+ struct.pack_into(
+ self._format_packing_metainfo,
+ self.shm.buf,
+ self._offset_packing_formats,
+ *(v.encode(_enc) for v in _formats)
+ )
+ struct.pack_into(
+ self._format_back_transform_codes,
+ self.shm.buf,
+ self._offset_back_transform_codes,
+ *(_recreation_codes)
+ )
+
+ else:
+ self._list_len = len(self) # Obtains size from offset 0 in buffer.
+ self._allocated_offsets = list(
+ struct.unpack_from(
+ self._format_size_metainfo,
+ self.shm.buf,
+ 1 * 8
+ )
+ )
+
+ def _get_packing_format(self, position):
+ "Gets the packing format for a single value stored in the list."
+ position = position if position >= 0 else position + self._list_len
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ v = struct.unpack_from(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8
+ )[0]
+ fmt = v.rstrip(b'\x00')
+ fmt_as_str = fmt.decode(_encoding)
+
+ return fmt_as_str
+
+ def _get_back_transform(self, position):
+ "Gets the back transformation function for a single value."
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ transform_code = struct.unpack_from(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position
+ )[0]
+ transform_function = self._back_transforms_mapping[transform_code]
+
+ return transform_function
+
+ def _set_packing_format_and_transform(self, position, fmt_as_str, value):
+ """Sets the packing format and back transformation code for a
+ single value in the list at the specified position."""
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ struct.pack_into(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8,
+ fmt_as_str.encode(_encoding)
+ )
+
+ transform_code = self._extract_recreation_code(value)
+ struct.pack_into(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position,
+ transform_code
+ )
+
+ def __getitem__(self, position):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ offset = self._offset_data_start + self._allocated_offsets[position]
+ (v,) = struct.unpack_from(
+ self._get_packing_format(position),
+ self.shm.buf,
+ offset
+ )
+ except IndexError:
+ raise IndexError("index out of range")
+
+ back_transform = self._get_back_transform(position)
+ v = back_transform(v)
+
+ return v
+
+ def __setitem__(self, position, value):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ item_offset = self._allocated_offsets[position]
+ offset = self._offset_data_start + item_offset
+ current_format = self._get_packing_format(position)
+ except IndexError:
+ raise IndexError("assignment index out of range")
+
+ if not isinstance(value, (str, bytes)):
+ new_format = self._types_mapping[type(value)]
+ encoded_value = value
+ else:
+ allocated_length = self._allocated_offsets[position + 1] - item_offset
+
+ encoded_value = (value.encode(_encoding)
+ if isinstance(value, str) else value)
+ if len(encoded_value) > allocated_length:
+ raise ValueError("bytes/str item exceeds available storage")
+ if current_format[-1] == "s":
+ new_format = current_format
+ else:
+ new_format = self._types_mapping[str] % (
+ allocated_length,
+ )
+
+ self._set_packing_format_and_transform(
+ position,
+ new_format,
+ value
+ )
+ struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
+
+ def __reduce__(self):
+ return partial(self.__class__, name=self.shm.name), ()
+
+ def __len__(self):
+ return struct.unpack_from("q", self.shm.buf, 0)[0]
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
+
+ @property
+ def format(self):
+ "The struct packing format used by all currently stored items."
+ return "".join(
+ self._get_packing_format(i) for i in range(self._list_len)
+ )
+
+ @property
+ def _format_size_metainfo(self):
+ "The struct packing format used for the items' storage offsets."
+ return "q" * (self._list_len + 1)
+
+ @property
+ def _format_packing_metainfo(self):
+ "The struct packing format used for the items' packing formats."
+ return "8s" * self._list_len
+
+ @property
+ def _format_back_transform_codes(self):
+ "The struct packing format used for the items' back transforms."
+ return "b" * self._list_len
+
+ @property
+ def _offset_data_start(self):
+ # - 8 bytes for the list length
+ # - (N + 1) * 8 bytes for the element offsets
+ return (self._list_len + 2) * 8
+
+ @property
+ def _offset_packing_formats(self):
+ return self._offset_data_start + self._allocated_offsets[-1]
+
+ @property
+ def _offset_back_transform_codes(self):
+ return self._offset_packing_formats + self._list_len * 8
+
+ def count(self, value):
+ "L.count(value) -> integer -- return number of occurrences of value."
+
+ return sum(value == entry for entry in self)
+
+ def index(self, value):
+ """L.index(value) -> integer -- return first index of value.
+ Raises ValueError if the value is not present."""
+
+ for position, entry in enumerate(self):
+ if value == entry:
+ return position
+ else:
+ raise ValueError(f"{value!r} not in this container")
+
+ __class_getitem__ = classmethod(types.GenericAlias)
diff --git a/contrib/tools/python3/Lib/multiprocessing/sharedctypes.py b/contrib/tools/python3/Lib/multiprocessing/sharedctypes.py
new file mode 100644
index 0000000000..6071707027
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/sharedctypes.py
@@ -0,0 +1,240 @@
+#
+# Module which supports allocation of ctypes objects from shared memory
+#
+# multiprocessing/sharedctypes.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import ctypes
+import weakref
+
+from . import heap
+from . import get_context
+
+from .context import reduction, assert_spawning
+_ForkingPickler = reduction.ForkingPickler
+
+__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
+
+#
+#
+#
+
+typecode_to_type = {
+ 'c': ctypes.c_char, 'u': ctypes.c_wchar,
+ 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
+ 'h': ctypes.c_short, 'H': ctypes.c_ushort,
+ 'i': ctypes.c_int, 'I': ctypes.c_uint,
+ 'l': ctypes.c_long, 'L': ctypes.c_ulong,
+ 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
+ 'f': ctypes.c_float, 'd': ctypes.c_double
+ }
+
+#
+#
+#
+
+def _new_value(type_):
+ size = ctypes.sizeof(type_)
+ wrapper = heap.BufferWrapper(size)
+ return rebuild_ctype(type_, wrapper, None)
+
+def RawValue(typecode_or_type, *args):
+ '''
+ Returns a ctypes object allocated from shared memory
+ '''
+ type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
+ obj = _new_value(type_)
+ ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
+ obj.__init__(*args)
+ return obj
+
+def RawArray(typecode_or_type, size_or_initializer):
+ '''
+ Returns a ctypes array allocated from shared memory
+ '''
+ type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
+ if isinstance(size_or_initializer, int):
+ type_ = type_ * size_or_initializer
+ obj = _new_value(type_)
+ ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
+ return obj
+ else:
+ type_ = type_ * len(size_or_initializer)
+ result = _new_value(type_)
+ result.__init__(*size_or_initializer)
+ return result
+
+def Value(typecode_or_type, *args, lock=True, ctx=None):
+ '''
+ Return a synchronization wrapper for a Value
+ '''
+ obj = RawValue(typecode_or_type, *args)
+ if lock is False:
+ return obj
+ if lock in (True, None):
+ ctx = ctx or get_context()
+ lock = ctx.RLock()
+ if not hasattr(lock, 'acquire'):
+ raise AttributeError("%r has no method 'acquire'" % lock)
+ return synchronized(obj, lock, ctx=ctx)
+
+def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
+ '''
+ Return a synchronization wrapper for a RawArray
+ '''
+ obj = RawArray(typecode_or_type, size_or_initializer)
+ if lock is False:
+ return obj
+ if lock in (True, None):
+ ctx = ctx or get_context()
+ lock = ctx.RLock()
+ if not hasattr(lock, 'acquire'):
+ raise AttributeError("%r has no method 'acquire'" % lock)
+ return synchronized(obj, lock, ctx=ctx)
+
+def copy(obj):
+ new_obj = _new_value(type(obj))
+ ctypes.pointer(new_obj)[0] = obj
+ return new_obj
+
+def synchronized(obj, lock=None, ctx=None):
+ assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
+ ctx = ctx or get_context()
+
+ if isinstance(obj, ctypes._SimpleCData):
+ return Synchronized(obj, lock, ctx)
+ elif isinstance(obj, ctypes.Array):
+ if obj._type_ is ctypes.c_char:
+ return SynchronizedString(obj, lock, ctx)
+ return SynchronizedArray(obj, lock, ctx)
+ else:
+ cls = type(obj)
+ try:
+ scls = class_cache[cls]
+ except KeyError:
+ names = [field[0] for field in cls._fields_]
+ d = {name: make_property(name) for name in names}
+ classname = 'Synchronized' + cls.__name__
+ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
+ return scls(obj, lock, ctx)
+
+#
+# Functions for pickling/unpickling
+#
+
+def reduce_ctype(obj):
+ assert_spawning(obj)
+ if isinstance(obj, ctypes.Array):
+ return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
+ else:
+ return rebuild_ctype, (type(obj), obj._wrapper, None)
+
+def rebuild_ctype(type_, wrapper, length):
+ if length is not None:
+ type_ = type_ * length
+ _ForkingPickler.register(type_, reduce_ctype)
+ buf = wrapper.create_memoryview()
+ obj = type_.from_buffer(buf)
+ obj._wrapper = wrapper
+ return obj
+
+#
+# Function to create properties
+#
+
+def make_property(name):
+ try:
+ return prop_cache[name]
+ except KeyError:
+ d = {}
+ exec(template % ((name,)*7), d)
+ prop_cache[name] = d[name]
+ return d[name]
+
+template = '''
+def get%s(self):
+ self.acquire()
+ try:
+ return self._obj.%s
+ finally:
+ self.release()
+def set%s(self, value):
+ self.acquire()
+ try:
+ self._obj.%s = value
+ finally:
+ self.release()
+%s = property(get%s, set%s)
+'''
+
+prop_cache = {}
+class_cache = weakref.WeakKeyDictionary()
+
+#
+# Synchronized wrappers
+#
+
+class SynchronizedBase(object):
+
+ def __init__(self, obj, lock=None, ctx=None):
+ self._obj = obj
+ if lock:
+ self._lock = lock
+ else:
+ ctx = ctx or get_context(force=True)
+ self._lock = ctx.RLock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
+ def __reduce__(self):
+ assert_spawning(self)
+ return synchronized, (self._obj, self._lock)
+
+ def get_obj(self):
+ return self._obj
+
+ def get_lock(self):
+ return self._lock
+
+ def __repr__(self):
+ return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
+
+
+class Synchronized(SynchronizedBase):
+ value = make_property('value')
+
+
+class SynchronizedArray(SynchronizedBase):
+
+ def __len__(self):
+ return len(self._obj)
+
+ def __getitem__(self, i):
+ with self:
+ return self._obj[i]
+
+ def __setitem__(self, i, value):
+ with self:
+ self._obj[i] = value
+
+ def __getslice__(self, start, stop):
+ with self:
+ return self._obj[start:stop]
+
+ def __setslice__(self, start, stop, values):
+ with self:
+ self._obj[start:stop] = values
+
+
+class SynchronizedString(SynchronizedArray):
+ value = make_property('value')
+ raw = make_property('raw')
diff --git a/contrib/tools/python3/Lib/multiprocessing/spawn.py b/contrib/tools/python3/Lib/multiprocessing/spawn.py
new file mode 100644
index 0000000000..a0ab98cfd2
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/spawn.py
@@ -0,0 +1,307 @@
+#
+# Code used to start processes when using the spawn or forkserver
+# start methods.
+#
+# multiprocessing/spawn.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import sys
+import runpy
+import types
+
+from . import get_start_method, set_start_method
+from . import process
+from .context import reduction
+from . import util
+
+__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
+ 'get_preparation_data', 'get_command_line', 'import_main_path']
+
+#
+# _python_exe is the assumed path to the python executable.
+# People embedding Python want to modify it.
+#
+
+if sys.platform != 'win32':
+ WINEXE = False
+ WINSERVICE = False
+else:
+ WINEXE = getattr(sys, 'frozen', False)
+ WINSERVICE = sys.executable and sys.executable.lower().endswith("pythonservice.exe")
+
+def set_executable(exe):
+ global _python_exe
+ if exe is None:
+ _python_exe = exe
+ elif sys.platform == 'win32':
+ _python_exe = os.fsdecode(exe)
+ else:
+ _python_exe = os.fsencode(exe)
+
+def get_executable():
+ return _python_exe
+
+if WINSERVICE:
+ set_executable(os.path.join(sys.exec_prefix, 'python.exe'))
+else:
+ set_executable(sys.executable)
+
+#
+#
+#
+
+def is_forking(argv):
+ '''
+ Return whether commandline indicates we are forking
+ '''
+ if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+ return True
+ else:
+ return False
+
+
+def freeze_support():
+ '''
+ Run code for process object if this in not the main process
+ '''
+ if is_forking(sys.argv):
+ kwds = {}
+ for arg in sys.argv[2:]:
+ name, value = arg.split('=')
+ if value == 'None':
+ kwds[name] = None
+ else:
+ kwds[name] = int(value)
+ spawn_main(**kwds)
+ sys.exit()
+
+
+def get_command_line(**kwds):
+ '''
+ Returns prefix of command line used for spawning a child process
+ '''
+ if False and getattr(sys, 'frozen', False):
+ return ([sys.executable, '--multiprocessing-fork'] +
+ ['%s=%r' % item for item in kwds.items()])
+ else:
+ prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
+ prog %= ', '.join('%s=%r' % item for item in kwds.items())
+ opts = util._args_from_interpreter_flags()
+ exe = get_executable()
+ return [exe] + opts + ['-c', prog, '--multiprocessing-fork']
+
+
+def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
+ '''
+ Run code specified by data received over pipe
+ '''
+ assert is_forking(sys.argv), "Not forking"
+ if sys.platform == 'win32':
+ import msvcrt
+ import _winapi
+
+ if parent_pid is not None:
+ source_process = _winapi.OpenProcess(
+ _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
+ False, parent_pid)
+ else:
+ source_process = None
+ new_handle = reduction.duplicate(pipe_handle,
+ source_process=source_process)
+ fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
+ parent_sentinel = source_process
+ else:
+ from . import resource_tracker
+ resource_tracker._resource_tracker._fd = tracker_fd
+ fd = pipe_handle
+ parent_sentinel = os.dup(pipe_handle)
+ exitcode = _main(fd, parent_sentinel)
+ sys.exit(exitcode)
+
+
+def _main(fd, parent_sentinel):
+ with os.fdopen(fd, 'rb', closefd=True) as from_parent:
+ process.current_process()._inheriting = True
+ try:
+ preparation_data = reduction.pickle.load(from_parent)
+ prepare(preparation_data)
+ self = reduction.pickle.load(from_parent)
+ finally:
+ del process.current_process()._inheriting
+ return self._bootstrap(parent_sentinel)
+
+
+def _check_not_importing_main():
+ if getattr(process.current_process(), '_inheriting', False):
+ raise RuntimeError('''
+ An attempt has been made to start a new process before the
+ current process has finished its bootstrapping phase.
+
+ This probably means that you are not using fork to start your
+ child processes and you have forgotten to use the proper idiom
+ in the main module:
+
+ if __name__ == '__main__':
+ freeze_support()
+ ...
+
+ The "freeze_support()" line can be omitted if the program
+ is not going to be frozen to produce an executable.
+
+ To fix this issue, refer to the "Safe importing of main module"
+ section in https://docs.python.org/3/library/multiprocessing.html
+ ''')
+
+
+def get_preparation_data(name):
+ '''
+ Return info about parent needed by child to unpickle process object
+ '''
+ _check_not_importing_main()
+ d = dict(
+ log_to_stderr=util._log_to_stderr,
+ authkey=process.current_process().authkey,
+ )
+
+ if util._logger is not None:
+ d['log_level'] = util._logger.getEffectiveLevel()
+
+ sys_path=sys.path.copy()
+ try:
+ i = sys_path.index('')
+ except ValueError:
+ pass
+ else:
+ sys_path[i] = process.ORIGINAL_DIR
+
+ d.update(
+ name=name,
+ sys_path=sys_path,
+ sys_argv=sys.argv,
+ orig_dir=process.ORIGINAL_DIR,
+ dir=os.getcwd(),
+ start_method=get_start_method(),
+ )
+
+ # Figure out whether to initialise main in the subprocess as a module
+ # or through direct execution (or to leave it alone entirely)
+ main_module = sys.modules['__main__']
+ main_mod_name = getattr(main_module.__spec__, "name", None)
+ if main_mod_name is not None:
+ d['init_main_from_name'] = main_mod_name
+ elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
+ main_path = getattr(main_module, '__file__', None)
+ if main_path is not None:
+ if (not os.path.isabs(main_path) and
+ process.ORIGINAL_DIR is not None):
+ main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+ d['init_main_from_path'] = os.path.normpath(main_path)
+
+ return d
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+ '''
+ Try to get current process ready to unpickle process object
+ '''
+ if 'name' in data:
+ process.current_process().name = data['name']
+
+ if 'authkey' in data:
+ process.current_process().authkey = data['authkey']
+
+ if 'log_to_stderr' in data and data['log_to_stderr']:
+ util.log_to_stderr()
+
+ if 'log_level' in data:
+ util.get_logger().setLevel(data['log_level'])
+
+ if 'sys_path' in data:
+ sys.path = data['sys_path']
+
+ if 'sys_argv' in data:
+ sys.argv = data['sys_argv']
+
+ if 'dir' in data:
+ os.chdir(data['dir'])
+
+ if 'orig_dir' in data:
+ process.ORIGINAL_DIR = data['orig_dir']
+
+ if 'start_method' in data:
+ set_start_method(data['start_method'], force=True)
+
+ if 'init_main_from_name' in data:
+ _fixup_main_from_name(data['init_main_from_name'])
+ elif 'init_main_from_path' in data:
+ _fixup_main_from_path(data['init_main_from_path'])
+
+# Multiprocessing module helpers to fix up the main module in
+# spawned subprocesses
+def _fixup_main_from_name(mod_name):
+ # __main__.py files for packages, directories, zip archives, etc, run
+ # their "main only" code unconditionally, so we don't even try to
+ # populate anything in __main__, nor do we make any changes to
+ # __main__ attributes
+ current_main = sys.modules['__main__']
+ if mod_name == "__main__" or mod_name.endswith(".__main__"):
+ return
+
+ # If this process was forked, __main__ may already be populated
+ if getattr(current_main.__spec__, "name", None) == mod_name:
+ return
+
+ # Otherwise, __main__ may contain some non-main code where we need to
+ # support unpickling it properly. We rerun it as __mp_main__ and make
+ # the normal __main__ an alias to that
+ old_main_modules.append(current_main)
+ main_module = types.ModuleType("__mp_main__")
+ main_content = runpy.run_module(mod_name,
+ run_name="__mp_main__",
+ alter_sys=True)
+ main_module.__dict__.update(main_content)
+ sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
+
+
+def _fixup_main_from_path(main_path):
+ # If this process was forked, __main__ may already be populated
+ current_main = sys.modules['__main__']
+
+ # Unfortunately, the main ipython launch script historically had no
+ # "if __name__ == '__main__'" guard, so we work around that
+ # by treating it like a __main__.py file
+ # See https://github.com/ipython/ipython/issues/4698
+ main_name = os.path.splitext(os.path.basename(main_path))[0]
+ if main_name == 'ipython':
+ return
+
+ # Otherwise, if __file__ already has the setting we expect,
+ # there's nothing more to do
+ if getattr(current_main, '__file__', None) == main_path:
+ return
+
+ # If the parent process has sent a path through rather than a module
+ # name we assume it is an executable script that may contain
+ # non-main code that needs to be executed
+ old_main_modules.append(current_main)
+ main_module = types.ModuleType("__mp_main__")
+ main_content = runpy.run_path(main_path,
+ run_name="__mp_main__")
+ main_module.__dict__.update(main_content)
+ sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
+
+
+def import_main_path(main_path):
+ '''
+ Set sys.modules['__main__'] to module at main_path
+ '''
+ _fixup_main_from_path(main_path)
diff --git a/contrib/tools/python3/Lib/multiprocessing/synchronize.py b/contrib/tools/python3/Lib/multiprocessing/synchronize.py
new file mode 100644
index 0000000000..3ccbfe311c
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/synchronize.py
@@ -0,0 +1,404 @@
+#
+# Module implementing synchronization primitives
+#
+# multiprocessing/synchronize.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+__all__ = [
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
+ ]
+
+import threading
+import sys
+import tempfile
+import _multiprocessing
+import time
+
+from . import context
+from . import process
+from . import util
+
+# Try to import the mp.synchronize module cleanly, if it fails
+# raise ImportError for platforms lacking a working sem_open implementation.
+# See issue 3770
+try:
+ from _multiprocessing import SemLock, sem_unlink
+except (ImportError):
+ raise ImportError("This platform lacks a functioning sem_open" +
+ " implementation, therefore, the required" +
+ " synchronization primitives needed will not" +
+ " function, see issue 3770.")
+
+#
+# Constants
+#
+
+RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
+SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
+
+#
+# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
+#
+
+class SemLock(object):
+
+ _rand = tempfile._RandomNameSequence()
+
+ def __init__(self, kind, value, maxvalue, *, ctx):
+ if ctx is None:
+ ctx = context._default_context.get_context()
+ self._is_fork_ctx = ctx.get_start_method() == 'fork'
+ unlink_now = sys.platform == 'win32' or self._is_fork_ctx
+ for i in range(100):
+ try:
+ sl = self._semlock = _multiprocessing.SemLock(
+ kind, value, maxvalue, self._make_name(),
+ unlink_now)
+ except FileExistsError:
+ pass
+ else:
+ break
+ else:
+ raise FileExistsError('cannot find name for semaphore')
+
+ util.debug('created semlock with handle %s' % sl.handle)
+ self._make_methods()
+
+ if sys.platform != 'win32':
+ def _after_fork(obj):
+ obj._semlock._after_fork()
+ util.register_after_fork(self, _after_fork)
+
+ if self._semlock.name is not None:
+ # We only get here if we are on Unix with forking
+ # disabled. When the object is garbage collected or the
+ # process shuts down we unlink the semaphore name
+ from .resource_tracker import register
+ register(self._semlock.name, "semaphore")
+ util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
+ exitpriority=0)
+
+ @staticmethod
+ def _cleanup(name):
+ from .resource_tracker import unregister
+ sem_unlink(name)
+ unregister(name, "semaphore")
+
+ def _make_methods(self):
+ self.acquire = self._semlock.acquire
+ self.release = self._semlock.release
+
+ def __enter__(self):
+ return self._semlock.__enter__()
+
+ def __exit__(self, *args):
+ return self._semlock.__exit__(*args)
+
+ def __getstate__(self):
+ context.assert_spawning(self)
+ sl = self._semlock
+ if sys.platform == 'win32':
+ h = context.get_spawning_popen().duplicate_for_child(sl.handle)
+ else:
+ if self._is_fork_ctx:
+ raise RuntimeError('A SemLock created in a fork context is being '
+ 'shared with a process in a spawn context. This is '
+ 'not supported. Please use the same context to create '
+ 'multiprocessing objects and Process.')
+ h = sl.handle
+ return (h, sl.kind, sl.maxvalue, sl.name)
+
+ def __setstate__(self, state):
+ self._semlock = _multiprocessing.SemLock._rebuild(*state)
+ util.debug('recreated blocker with handle %r' % state[0])
+ self._make_methods()
+ # Ensure that deserialized SemLock can be serialized again (gh-108520).
+ self._is_fork_ctx = False
+
+ @staticmethod
+ def _make_name():
+ return '%s-%s' % (process.current_process()._config['semprefix'],
+ next(SemLock._rand))
+
+#
+# Semaphore
+#
+
+class Semaphore(SemLock):
+
+ def __init__(self, value=1, *, ctx):
+ SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
+
+ def get_value(self):
+ return self._semlock._get_value()
+
+ def __repr__(self):
+ try:
+ value = self._semlock._get_value()
+ except Exception:
+ value = 'unknown'
+ return '<%s(value=%s)>' % (self.__class__.__name__, value)
+
+#
+# Bounded semaphore
+#
+
+class BoundedSemaphore(Semaphore):
+
+ def __init__(self, value=1, *, ctx):
+ SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
+
+ def __repr__(self):
+ try:
+ value = self._semlock._get_value()
+ except Exception:
+ value = 'unknown'
+ return '<%s(value=%s, maxvalue=%s)>' % \
+ (self.__class__.__name__, value, self._semlock.maxvalue)
+
+#
+# Non-recursive lock
+#
+
+class Lock(SemLock):
+
+ def __init__(self, *, ctx):
+ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
+
+ def __repr__(self):
+ try:
+ if self._semlock._is_mine():
+ name = process.current_process().name
+ if threading.current_thread().name != 'MainThread':
+ name += '|' + threading.current_thread().name
+ elif self._semlock._get_value() == 1:
+ name = 'None'
+ elif self._semlock._count() > 0:
+ name = 'SomeOtherThread'
+ else:
+ name = 'SomeOtherProcess'
+ except Exception:
+ name = 'unknown'
+ return '<%s(owner=%s)>' % (self.__class__.__name__, name)
+
+#
+# Recursive lock
+#
+
+class RLock(SemLock):
+
+ def __init__(self, *, ctx):
+ SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
+
+ def __repr__(self):
+ try:
+ if self._semlock._is_mine():
+ name = process.current_process().name
+ if threading.current_thread().name != 'MainThread':
+ name += '|' + threading.current_thread().name
+ count = self._semlock._count()
+ elif self._semlock._get_value() == 1:
+ name, count = 'None', 0
+ elif self._semlock._count() > 0:
+ name, count = 'SomeOtherThread', 'nonzero'
+ else:
+ name, count = 'SomeOtherProcess', 'nonzero'
+ except Exception:
+ name, count = 'unknown', 'unknown'
+ return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
+
+#
+# Condition variable
+#
+
+class Condition(object):
+
+ def __init__(self, lock=None, *, ctx):
+ self._lock = lock or ctx.RLock()
+ self._sleeping_count = ctx.Semaphore(0)
+ self._woken_count = ctx.Semaphore(0)
+ self._wait_semaphore = ctx.Semaphore(0)
+ self._make_methods()
+
+ def __getstate__(self):
+ context.assert_spawning(self)
+ return (self._lock, self._sleeping_count,
+ self._woken_count, self._wait_semaphore)
+
+ def __setstate__(self, state):
+ (self._lock, self._sleeping_count,
+ self._woken_count, self._wait_semaphore) = state
+ self._make_methods()
+
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
+ def _make_methods(self):
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
+ def __repr__(self):
+ try:
+ num_waiters = (self._sleeping_count._semlock._get_value() -
+ self._woken_count._semlock._get_value())
+ except Exception:
+ num_waiters = 'unknown'
+ return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
+
+ def wait(self, timeout=None):
+ assert self._lock._semlock._is_mine(), \
+ 'must acquire() condition before using wait()'
+
+ # indicate that this thread is going to sleep
+ self._sleeping_count.release()
+
+ # release lock
+ count = self._lock._semlock._count()
+ for i in range(count):
+ self._lock.release()
+
+ try:
+ # wait for notification or timeout
+ return self._wait_semaphore.acquire(True, timeout)
+ finally:
+ # indicate that this thread has woken
+ self._woken_count.release()
+
+ # reacquire lock
+ for i in range(count):
+ self._lock.acquire()
+
+ def notify(self, n=1):
+ assert self._lock._semlock._is_mine(), 'lock is not owned'
+ assert not self._wait_semaphore.acquire(
+ False), ('notify: Should not have been able to acquire '
+ + '_wait_semaphore')
+
+ # to take account of timeouts since last notify*() we subtract
+ # woken_count from sleeping_count and rezero woken_count
+ while self._woken_count.acquire(False):
+ res = self._sleeping_count.acquire(False)
+ assert res, ('notify: Bug in sleeping_count.acquire'
+ + '- res should not be False')
+
+ sleepers = 0
+ while sleepers < n and self._sleeping_count.acquire(False):
+ self._wait_semaphore.release() # wake up one sleeper
+ sleepers += 1
+
+ if sleepers:
+ for i in range(sleepers):
+ self._woken_count.acquire() # wait for a sleeper to wake
+
+ # rezero wait_semaphore in case some timeouts just happened
+ while self._wait_semaphore.acquire(False):
+ pass
+
+ def notify_all(self):
+ self.notify(n=sys.maxsize)
+
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = time.monotonic() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - time.monotonic()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
+#
+# Event
+#
+
+class Event(object):
+
+ def __init__(self, *, ctx):
+ self._cond = ctx.Condition(ctx.Lock())
+ self._flag = ctx.Semaphore(0)
+
+ def is_set(self):
+ with self._cond:
+ if self._flag.acquire(False):
+ self._flag.release()
+ return True
+ return False
+
+ def set(self):
+ with self._cond:
+ self._flag.acquire(False)
+ self._flag.release()
+ self._cond.notify_all()
+
+ def clear(self):
+ with self._cond:
+ self._flag.acquire(False)
+
+ def wait(self, timeout=None):
+ with self._cond:
+ if self._flag.acquire(False):
+ self._flag.release()
+ else:
+ self._cond.wait(timeout)
+
+ if self._flag.acquire(False):
+ self._flag.release()
+ return True
+ return False
+
+ def __repr__(self) -> str:
+ set_status = 'set' if self.is_set() else 'unset'
+ return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"
+#
+# Barrier
+#
+
+class Barrier(threading.Barrier):
+
+ def __init__(self, parties, action=None, timeout=None, *, ctx):
+ import struct
+ from .heap import BufferWrapper
+ wrapper = BufferWrapper(struct.calcsize('i') * 2)
+ cond = ctx.Condition()
+ self.__setstate__((parties, action, timeout, cond, wrapper))
+ self._state = 0
+ self._count = 0
+
+ def __setstate__(self, state):
+ (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper) = state
+ self._array = self._wrapper.create_memoryview().cast('i')
+
+ def __getstate__(self):
+ return (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper)
+
+ @property
+ def _state(self):
+ return self._array[0]
+
+ @_state.setter
+ def _state(self, value):
+ self._array[0] = value
+
+ @property
+ def _count(self):
+ return self._array[1]
+
+ @_count.setter
+ def _count(self, value):
+ self._array[1] = value
diff --git a/contrib/tools/python3/Lib/multiprocessing/util.py b/contrib/tools/python3/Lib/multiprocessing/util.py
new file mode 100644
index 0000000000..eb2cea07e1
--- /dev/null
+++ b/contrib/tools/python3/Lib/multiprocessing/util.py
@@ -0,0 +1,509 @@
+#
+# Module providing various facilities to other parts of the package
+#
+# multiprocessing/util.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import itertools
+import sys
+import weakref
+import atexit
+import threading # we want threading to install it's
+ # cleanup function before multiprocessing does
+from subprocess import _args_from_interpreter_flags
+
+from . import process
+
+__all__ = [
+ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
+ 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
+ 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
+ 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
+ ]
+
+#
+# Logging
+#
+
+NOTSET = 0
+SUBDEBUG = 5
+DEBUG = 10
+INFO = 20
+SUBWARNING = 25
+
+LOGGER_NAME = 'multiprocessing'
+DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
+
+_logger = None
+_log_to_stderr = False
+
+def sub_debug(msg, *args):
+ if _logger:
+ _logger.log(SUBDEBUG, msg, *args, stacklevel=2)
+
+def debug(msg, *args):
+ if _logger:
+ _logger.log(DEBUG, msg, *args, stacklevel=2)
+
+def info(msg, *args):
+ if _logger:
+ _logger.log(INFO, msg, *args, stacklevel=2)
+
+def sub_warning(msg, *args):
+ if _logger:
+ _logger.log(SUBWARNING, msg, *args, stacklevel=2)
+
+def get_logger():
+ '''
+ Returns logger used by multiprocessing
+ '''
+ global _logger
+ import logging
+
+ logging._acquireLock()
+ try:
+ if not _logger:
+
+ _logger = logging.getLogger(LOGGER_NAME)
+ _logger.propagate = 0
+
+ # XXX multiprocessing should cleanup before logging
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(_exit_function)
+ atexit.register(_exit_function)
+ else:
+ atexit._exithandlers.remove((_exit_function, (), {}))
+ atexit._exithandlers.append((_exit_function, (), {}))
+
+ finally:
+ logging._releaseLock()
+
+ return _logger
+
+def log_to_stderr(level=None):
+ '''
+ Turn on logging and add a handler which prints to stderr
+ '''
+ global _log_to_stderr
+ import logging
+
+ logger = get_logger()
+ formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
+ handler = logging.StreamHandler()
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+
+ if level:
+ logger.setLevel(level)
+ _log_to_stderr = True
+ return _logger
+
+
+# Abstract socket support
+
+def _platform_supports_abstract_sockets():
+ if sys.platform == "linux":
+ return True
+ if hasattr(sys, 'getandroidapilevel'):
+ return True
+ return False
+
+
+def is_abstract_socket_namespace(address):
+ if not address:
+ return False
+ if isinstance(address, bytes):
+ return address[0] == 0
+ elif isinstance(address, str):
+ return address[0] == "\0"
+ raise TypeError(f'address type of {address!r} unrecognized')
+
+
+abstract_sockets_supported = _platform_supports_abstract_sockets()
+
+#
+# Function returning a temp directory which will be removed on exit
+#
+
+def _remove_temp_dir(rmtree, tempdir):
+ def onerror(func, path, err_info):
+ if not issubclass(err_info[0], FileNotFoundError):
+ raise
+ rmtree(tempdir, onerror=onerror)
+
+ current_process = process.current_process()
+ # current_process() can be None if the finalizer is called
+ # late during Python finalization
+ if current_process is not None:
+ current_process._config['tempdir'] = None
+
+def get_temp_dir():
+ # get name of a temp directory which will be automatically cleaned up
+ tempdir = process.current_process()._config.get('tempdir')
+ if tempdir is None:
+ import shutil, tempfile
+ tempdir = tempfile.mkdtemp(prefix='pymp-')
+ info('created temp directory %s', tempdir)
+ # keep a strong reference to shutil.rmtree(), since the finalizer
+ # can be called late during Python shutdown
+ Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
+ exitpriority=-100)
+ process.current_process()._config['tempdir'] = tempdir
+ return tempdir
+
+#
+# Support for reinitialization of objects when bootstrapping a child process
+#
+
+_afterfork_registry = weakref.WeakValueDictionary()
+_afterfork_counter = itertools.count()
+
+def _run_after_forkers():
+ items = list(_afterfork_registry.items())
+ items.sort()
+ for (index, ident, func), obj in items:
+ try:
+ func(obj)
+ except Exception as e:
+ info('after forker raised exception %s', e)
+
+def register_after_fork(obj, func):
+ _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
+
+#
+# Finalization using weakrefs
+#
+
+_finalizer_registry = {}
+_finalizer_counter = itertools.count()
+
+
+class Finalize(object):
+ '''
+ Class which supports object finalization using weakrefs
+ '''
+ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
+ if (exitpriority is not None) and not isinstance(exitpriority,int):
+ raise TypeError(
+ "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
+ exitpriority, type(exitpriority)))
+
+ if obj is not None:
+ self._weakref = weakref.ref(obj, self)
+ elif exitpriority is None:
+ raise ValueError("Without object, exitpriority cannot be None")
+
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs or {}
+ self._key = (exitpriority, next(_finalizer_counter))
+ self._pid = os.getpid()
+
+ _finalizer_registry[self._key] = self
+
+ def __call__(self, wr=None,
+ # Need to bind these locally because the globals can have
+ # been cleared at shutdown
+ _finalizer_registry=_finalizer_registry,
+ sub_debug=sub_debug, getpid=os.getpid):
+ '''
+ Run the callback unless it has already been called or cancelled
+ '''
+ try:
+ del _finalizer_registry[self._key]
+ except KeyError:
+ sub_debug('finalizer no longer registered')
+ else:
+ if self._pid != getpid():
+ sub_debug('finalizer ignored because different process')
+ res = None
+ else:
+ sub_debug('finalizer calling %s with args %s and kwargs %s',
+ self._callback, self._args, self._kwargs)
+ res = self._callback(*self._args, **self._kwargs)
+ self._weakref = self._callback = self._args = \
+ self._kwargs = self._key = None
+ return res
+
+ def cancel(self):
+ '''
+ Cancel finalization of the object
+ '''
+ try:
+ del _finalizer_registry[self._key]
+ except KeyError:
+ pass
+ else:
+ self._weakref = self._callback = self._args = \
+ self._kwargs = self._key = None
+
+ def still_active(self):
+ '''
+ Return whether this finalizer is still waiting to invoke callback
+ '''
+ return self._key in _finalizer_registry
+
+ def __repr__(self):
+ try:
+ obj = self._weakref()
+ except (AttributeError, TypeError):
+ obj = None
+
+ if obj is None:
+ return '<%s object, dead>' % self.__class__.__name__
+
+ x = '<%s object, callback=%s' % (
+ self.__class__.__name__,
+ getattr(self._callback, '__name__', self._callback))
+ if self._args:
+ x += ', args=' + str(self._args)
+ if self._kwargs:
+ x += ', kwargs=' + str(self._kwargs)
+ if self._key[0] is not None:
+ x += ', exitpriority=' + str(self._key[0])
+ return x + '>'
+
+
+def _run_finalizers(minpriority=None):
+ '''
+ Run all finalizers whose exit priority is not None and at least minpriority
+
+ Finalizers with highest priority are called first; finalizers with
+ the same priority will be called in reverse order of creation.
+ '''
+ if _finalizer_registry is None:
+ # This function may be called after this module's globals are
+ # destroyed. See the _exit_function function in this module for more
+ # notes.
+ return
+
+ if minpriority is None:
+ f = lambda p : p[0] is not None
+ else:
+ f = lambda p : p[0] is not None and p[0] >= minpriority
+
+ # Careful: _finalizer_registry may be mutated while this function
+ # is running (either by a GC run or by another thread).
+
+ # list(_finalizer_registry) should be atomic, while
+ # list(_finalizer_registry.items()) is not.
+ keys = [key for key in list(_finalizer_registry) if f(key)]
+ keys.sort(reverse=True)
+
+ for key in keys:
+ finalizer = _finalizer_registry.get(key)
+ # key may have been removed from the registry
+ if finalizer is not None:
+ sub_debug('calling %s', finalizer)
+ try:
+ finalizer()
+ except Exception:
+ import traceback
+ traceback.print_exc()
+
+ if minpriority is None:
+ _finalizer_registry.clear()
+
+#
+# Clean up on exit
+#
+
+def is_exiting():
+ '''
+ Returns true if the process is shutting down
+ '''
+ return _exiting or _exiting is None
+
+_exiting = False
+
+def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
+ active_children=process.active_children,
+ current_process=process.current_process):
+ # We hold on to references to functions in the arglist due to the
+ # situation described below, where this function is called after this
+ # module's globals are destroyed.
+
+ global _exiting
+
+ if not _exiting:
+ _exiting = True
+
+ info('process shutting down')
+ debug('running all "atexit" finalizers with priority >= 0')
+ _run_finalizers(0)
+
+ if current_process() is not None:
+ # We check if the current process is None here because if
+ # it's None, any call to ``active_children()`` will raise
+ # an AttributeError (active_children winds up trying to
+ # get attributes from util._current_process). One
+ # situation where this can happen is if someone has
+ # manipulated sys.modules, causing this module to be
+ # garbage collected. The destructor for the module type
+ # then replaces all values in the module dict with None.
+ # For instance, after setuptools runs a test it replaces
+ # sys.modules with a copy created earlier. See issues
+ # #9775 and #15881. Also related: #4106, #9205, and
+ # #9207.
+
+ for p in active_children():
+ if p.daemon:
+ info('calling terminate() for daemon %s', p.name)
+ p._popen.terminate()
+
+ for p in active_children():
+ info('calling join() for process %s', p.name)
+ p.join()
+
+ debug('running the remaining "atexit" finalizers')
+ _run_finalizers()
+
+atexit.register(_exit_function)
+
+#
+# Some fork aware types
+#
+
+class ForkAwareThreadLock(object):
+ def __init__(self):
+ self._lock = threading.Lock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+ register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
+
+ def _at_fork_reinit(self):
+ self._lock._at_fork_reinit()
+
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
+
+class ForkAwareLocal(threading.local):
+ def __new__(cls):
+ self = threading.local.__new__(cls)
+ register_after_fork(self, lambda obj: obj.__dict__.clear())
+ return self
+
+ def __reduce__(self):
+ return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+ MAXFD = 256
+
+def close_all_fds_except(fds):
+ fds = list(fds) + [-1, MAXFD]
+ fds.sort()
+ assert fds[-1] == MAXFD, 'fd too large'
+ for i in range(len(fds) - 1):
+ os.closerange(fds[i]+1, fds[i+1])
+#
+# Close sys.stdin and replace stdin with os.devnull
+#
+
+def _close_stdin():
+ if sys.stdin is None:
+ return
+
+ try:
+ sys.stdin.close()
+ except (OSError, ValueError):
+ pass
+
+ try:
+ fd = os.open(os.devnull, os.O_RDONLY)
+ try:
+ sys.stdin = open(fd, encoding="utf-8", closefd=False)
+ except:
+ os.close(fd)
+ raise
+ except (OSError, ValueError):
+ pass
+
+#
+# Flush standard streams, if any
+#
+
+def _flush_std_streams():
+ try:
+ sys.stdout.flush()
+ except (AttributeError, ValueError):
+ pass
+ try:
+ sys.stderr.flush()
+ except (AttributeError, ValueError):
+ pass
+
+#
+# Start a program with only specified fds kept open
+#
+
+
+def _env_list():
+ # Based on fork_exec in subprocess.py.
+ env = os.environ.copy()
+ env['Y_PYTHON_ENTRY_POINT'] = ':main'
+ env_list = []
+ for k, v in env.items():
+ if '=' not in k:
+ env_list.append(os.fsencode(k) + b'=' + os.fsencode(v))
+ return env_list
+
+
+def spawnv_passfds(path, args, passfds):
+ import _posixsubprocess
+ import subprocess
+ passfds = tuple(sorted(map(int, passfds)))
+ errpipe_read, errpipe_write = os.pipe()
+ try:
+ return _posixsubprocess.fork_exec(
+ args, [path], True, passfds, None, _env_list(),
+ -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+ False, False, -1, None, None, None, -1, None,
+ subprocess._USE_VFORK)
+ finally:
+ os.close(errpipe_read)
+ os.close(errpipe_write)
+
+
+def close_fds(*fds):
+ """Close each file descriptor given as an argument"""
+ for fd in fds:
+ os.close(fd)
+
+
+def _cleanup_tests():
+ """Cleanup multiprocessing resources when multiprocessing tests
+ completed."""
+
+ from test import support
+
+ # cleanup multiprocessing
+ process._cleanup()
+
+ # Stop the ForkServer process if it's running
+ from multiprocessing import forkserver
+ forkserver._forkserver._stop()
+
+ # Stop the ResourceTracker process if it's running
+ from multiprocessing import resource_tracker
+ resource_tracker._resource_tracker._stop()
+
+ # bpo-37421: Explicitly call _run_finalizers() to remove immediately
+ # temporary directories created by multiprocessing.util.get_temp_dir().
+ _run_finalizers()
+ support.gc_collect()
+
+ support.reap_children()