aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/tools/python3/src/Lib/multiprocessing/reduction.py
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/reduction.py')
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/reduction.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/reduction.py b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
new file mode 100644
index 0000000000..5593f0682f
--- /dev/null
+++ b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
@@ -0,0 +1,281 @@
+#
+# Module which deals with pickling of objects.
+#
+# multiprocessing/reduction.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+from abc import ABCMeta
+import copyreg
+import functools
+import io
+import os
+import pickle
+import socket
+import sys
+
+from . import context
+
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
+
+
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+ (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS') and
+ hasattr(socket.socket, 'sendmsg')))
+
+#
+# Pickler subclass
+#
+
+class ForkingPickler(pickle.Pickler):
+ '''Pickler subclass used by multiprocessing.'''
+ _extra_reducers = {}
+ _copyreg_dispatch_table = copyreg.dispatch_table
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.dispatch_table = self._copyreg_dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
+
+ @classmethod
+ def register(cls, type, reduce):
+ '''Register a reduce function for a type.'''
+ cls._extra_reducers[type] = reduce
+
+ @classmethod
+ def dumps(cls, obj, protocol=None):
+ buf = io.BytesIO()
+ cls(buf, protocol).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+ '''Replacement for pickle.dump() using ForkingPickler.'''
+ ForkingPickler(file, protocol).dump(obj)
+
+#
+# Platform specific definitions
+#
+
+if sys.platform == 'win32':
+ # Windows
+ __all__ += ['DupHandle', 'duplicate', 'steal_handle']
+ import _winapi
+
+ def duplicate(handle, target_process=None, inheritable=False,
+ *, source_process=None):
+ '''Duplicate a handle. (target_process is a handle not a pid!)'''
+ current_process = _winapi.GetCurrentProcess()
+ if source_process is None:
+ source_process = current_process
+ if target_process is None:
+ target_process = current_process
+ return _winapi.DuplicateHandle(
+ source_process, handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+ def steal_handle(source_pid, handle):
+ '''Steal a handle from process identified by source_pid.'''
+ source_process_handle = _winapi.OpenProcess(
+ _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+ try:
+ return _winapi.DuplicateHandle(
+ source_process_handle, handle,
+ _winapi.GetCurrentProcess(), 0, False,
+ _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(source_process_handle)
+
+ def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
+ dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
+ conn.send(dh)
+
+ def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
+ return conn.recv().detach()
+
+ class DupHandle(object):
+ '''Picklable wrapper for a handle.'''
+ def __init__(self, handle, access, pid=None):
+ if pid is None:
+ # We just duplicate the handle in the current process and
+ # let the receiving process steal the handle.
+ pid = os.getpid()
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
+ try:
+ self._handle = _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(),
+ handle, proc, access, False, 0)
+ finally:
+ _winapi.CloseHandle(proc)
+ self._access = access
+ self._pid = pid
+
+ def detach(self):
+ '''Get the handle. This should only be called once.'''
+ # retrieve handle from process which currently owns it
+ if self._pid == os.getpid():
+ # The handle has already been duplicated for this process.
+ return self._handle
+ # We must steal the handle from the process whose pid is self._pid.
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
+ self._pid)
+ try:
+ return _winapi.DuplicateHandle(
+ proc, self._handle, _winapi.GetCurrentProcess(),
+ self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(proc)
+
+else:
+ # Unix
+ __all__ += ['DupFd', 'sendfds', 'recvfds']
+ import array
+
+ # On MacOSX we should acknowledge receipt of fds -- see Issue14669
+ ACKNOWLEDGE = sys.platform == 'darwin'
+
+ def sendfds(sock, fds):
+ '''Send an array of fds over an AF_UNIX socket.'''
+ fds = array.array('i', fds)
+ msg = bytes([len(fds) % 256])
+ sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+ if ACKNOWLEDGE and sock.recv(1) != b'A':
+ raise RuntimeError('did not receive acknowledgement of fd')
+
+ def recvfds(sock, size):
+ '''Receive an array of fds over an AF_UNIX socket.'''
+ a = array.array('i')
+ bytes_size = a.itemsize * size
+ msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
+ if not msg and not ancdata:
+ raise EOFError
+ try:
+ if ACKNOWLEDGE:
+ sock.send(b'A')
+ if len(ancdata) != 1:
+ raise RuntimeError('received %d items of ancdata' %
+ len(ancdata))
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ if len(cmsg_data) % a.itemsize != 0:
+ raise ValueError
+ a.frombytes(cmsg_data)
+ if len(a) % 256 != msg[0]:
+ raise AssertionError(
+ "Len is {0:n} but msg[0] is {1!r}".format(
+ len(a), msg[0]))
+ return list(a)
+ except (ValueError, IndexError):
+ pass
+ raise RuntimeError('Invalid data received')
+
+ def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ sendfds(s, [handle])
+
+ def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ return recvfds(s, 1)[0]
+
+ def DupFd(fd):
+ '''Return a wrapper for an fd.'''
+ popen_obj = context.get_spawning_popen()
+ if popen_obj is not None:
+ return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+ elif HAVE_SEND_HANDLE:
+ from . import resource_sharer
+ return resource_sharer.DupFd(fd)
+ else:
+ raise ValueError('SCM_RIGHTS appears not to be available')
+
+#
+# Try making some callable types picklable
+#
+
+def _reduce_method(m):
+ if m.__self__ is None:
+ return getattr, (m.__class__, m.__func__.__name__)
+ else:
+ return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+ def f(self):
+ pass
+register(type(_C().f), _reduce_method)
+
+
+def _reduce_method_descriptor(m):
+ return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
+
+
+def _reduce_partial(p):
+ return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+ return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
+
+#
+# Make sockets picklable
+#
+
+if sys.platform == 'win32':
+ def _reduce_socket(s):
+ from .resource_sharer import DupSocket
+ return _rebuild_socket, (DupSocket(s),)
+ def _rebuild_socket(ds):
+ return ds.detach()
+ register(socket.socket, _reduce_socket)
+
+else:
+ def _reduce_socket(s):
+ df = DupFd(s.fileno())
+ return _rebuild_socket, (df, s.family, s.type, s.proto)
+ def _rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ return socket.socket(family, type, proto, fileno=fd)
+ register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+ '''Abstract base class for use in implementing a Reduction class
+ suitable for use in replacing the standard reduction mechanism
+ used in multiprocessing.'''
+ ForkingPickler = ForkingPickler
+ register = register
+ dump = dump
+ send_handle = send_handle
+ recv_handle = recv_handle
+
+ if sys.platform == 'win32':
+ steal_handle = steal_handle
+ duplicate = duplicate
+ DupHandle = DupHandle
+ else:
+ sendfds = sendfds
+ recvfds = recvfds
+ DupFd = DupFd
+
+ _reduce_method = _reduce_method
+ _reduce_method_descriptor = _reduce_method_descriptor
+ _rebuild_partial = _rebuild_partial
+ _reduce_socket = _reduce_socket
+ _rebuild_socket = _rebuild_socket
+
+ def __init__(self, *args):
+ register(type(_C().f), _reduce_method)
+ register(type(list.append), _reduce_method_descriptor)
+ register(type(int.__add__), _reduce_method_descriptor)
+ register(functools.partial, _reduce_partial)
+ register(socket.socket, _reduce_socket)