aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/multiprocessing
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/multiprocessing
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing')
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/__init__.py4
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/connection.py56
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/context.py24
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py2
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/forkserver.py60
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/heap.py182
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/managers.py358
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/pool.py398
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py16
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py10
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py18
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py18
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/process.py144
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/queues.py44
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/reduction.py14
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py6
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py462
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py1064
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/spawn.py34
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/synchronize.py10
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/util.py146
21 files changed, 1535 insertions, 1535 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/__init__.py b/contrib/tools/python3/src/Lib/multiprocessing/__init__.py
index 8336f381de..d412235024 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/__init__.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/__init__.py
@@ -19,8 +19,8 @@ from . import context
# Copy stuff from default context
#
-__all__ = [x for x in dir(context._default_context) if not x.startswith('_')]
-globals().update((name, getattr(context._default_context, name)) for name in __all__)
+__all__ = [x for x in dir(context._default_context) if not x.startswith('_')]
+globals().update((name, getattr(context._default_context, name)) for name in __all__)
#
# XXX These should not really be documented or public.
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/connection.py b/contrib/tools/python3/src/Lib/multiprocessing/connection.py
index 510e4b5aba..850c317da5 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/connection.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/connection.py
@@ -73,11 +73,11 @@ def arbitrary_address(family):
if family == 'AF_INET':
return ('localhost', 0)
elif family == 'AF_UNIX':
- # Prefer abstract sockets if possible to avoid problems with the address
- # size. When coding portable applications, some implementations have
- # sun_path as short as 92 bytes in the sockaddr_un struct.
- if util.abstract_sockets_supported:
- return f"\0listener-{os.getpid()}-{next(_mmap_counter)}"
+ # Prefer abstract sockets if possible to avoid problems with the address
+ # size. When coding portable applications, some implementations have
+ # sun_path as short as 92 bytes in the sockaddr_un struct.
+ if util.abstract_sockets_supported:
+ return f"\0listener-{os.getpid()}-{next(_mmap_counter)}"
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
@@ -107,7 +107,7 @@ def address_type(address):
return 'AF_INET'
elif type(address) is str and address.startswith('\\\\'):
return 'AF_PIPE'
- elif type(address) is str or util.is_abstract_socket_namespace(address):
+ elif type(address) is str or util.is_abstract_socket_namespace(address):
return 'AF_UNIX'
else:
raise ValueError('address type of %r unrecognized' % address)
@@ -394,33 +394,33 @@ class Connection(_ConnectionBase):
def _send_bytes(self, buf):
n = len(buf)
- if n > 0x7fffffff:
- pre_header = struct.pack("!i", -1)
- header = struct.pack("!Q", n)
- self._send(pre_header)
+ if n > 0x7fffffff:
+ pre_header = struct.pack("!i", -1)
+ header = struct.pack("!Q", n)
+ self._send(pre_header)
self._send(header)
self._send(buf)
else:
- # For wire compatibility with 3.7 and lower
- header = struct.pack("!i", n)
- if n > 16384:
- # The payload is large so Nagle's algorithm won't be triggered
- # and we'd better avoid the cost of concatenation.
- self._send(header)
- self._send(buf)
- else:
- # Issue #20540: concatenate before sending, to avoid delays due
- # to Nagle's algorithm on a TCP socket.
- # Also note we want to avoid sending a 0-length buffer separately,
- # to avoid "broken pipe" errors if the other end closed the pipe.
- self._send(header + buf)
+ # For wire compatibility with 3.7 and lower
+ header = struct.pack("!i", n)
+ if n > 16384:
+ # The payload is large so Nagle's algorithm won't be triggered
+ # and we'd better avoid the cost of concatenation.
+ self._send(header)
+ self._send(buf)
+ else:
+ # Issue #20540: concatenate before sending, to avoid delays due
+ # to Nagle's algorithm on a TCP socket.
+ # Also note we want to avoid sending a 0-length buffer separately,
+ # to avoid "broken pipe" errors if the other end closed the pipe.
+ self._send(header + buf)
def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
- if size == -1:
- buf = self._recv(8)
- size, = struct.unpack("!Q", buf.getvalue())
+ if size == -1:
+ buf = self._recv(8)
+ size, = struct.unpack("!Q", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
@@ -602,8 +602,8 @@ class SocketListener(object):
self._family = family
self._last_accepted = None
- if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
- # Linux abstract socket namespaces do not need to be explicitly unlinked
+ if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
+ # Linux abstract socket namespaces do not need to be explicitly unlinked
self._unlink = util.Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/context.py b/contrib/tools/python3/src/Lib/multiprocessing/context.py
index 8d0525d5d6..eb1917bcbf 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/context.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/context.py
@@ -5,7 +5,7 @@ import threading
from . import process
from . import reduction
-__all__ = ()
+__all__ = ()
#
# Exceptions
@@ -24,7 +24,7 @@ class AuthenticationError(ProcessError):
pass
#
-# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
+# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
#
class BaseContext(object):
@@ -35,7 +35,7 @@ class BaseContext(object):
AuthenticationError = AuthenticationError
current_process = staticmethod(process.current_process)
- parent_process = staticmethod(process.parent_process)
+ parent_process = staticmethod(process.parent_process)
active_children = staticmethod(process.active_children)
def cpu_count(self):
@@ -257,10 +257,10 @@ class DefaultContext(BaseContext):
if sys.platform == 'win32':
return ['spawn']
else:
- methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
+ methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
if reduction.HAVE_SEND_HANDLE:
- methods.append('forkserver')
- return methods
+ methods.append('forkserver')
+ return methods
#
@@ -310,12 +310,12 @@ if sys.platform != 'win32':
'spawn': SpawnContext(),
'forkserver': ForkServerContext(),
}
- if sys.platform == 'darwin':
- # bpo-33725: running arbitrary code after fork() is no longer reliable
- # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
- _default_context = DefaultContext(_concrete_contexts['spawn'])
- else:
- _default_context = DefaultContext(_concrete_contexts['fork'])
+ if sys.platform == 'darwin':
+ # bpo-33725: running arbitrary code after fork() is no longer reliable
+ # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
+ _default_context = DefaultContext(_concrete_contexts['spawn'])
+ else:
+ _default_context = DefaultContext(_concrete_contexts['fork'])
else:
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py b/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py
index 6a1468609e..1ecf8376d3 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py
@@ -80,7 +80,7 @@ def freeze_support():
#
class Namespace(object):
- def __init__(self, /, **kwds):
+ def __init__(self, /, **kwds):
self.__dict__.update(kwds)
def __repr__(self):
items = list(self.__dict__.items())
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py b/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py
index 22a911a7a2..45193a1235 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py
@@ -11,7 +11,7 @@ import warnings
from . import connection
from . import process
from .context import reduction
-from . import resource_tracker
+from . import resource_tracker
from . import spawn
from . import util
@@ -39,26 +39,26 @@ class ForkServer(object):
self._lock = threading.Lock()
self._preload_modules = ['__main__']
- def _stop(self):
- # Method used by unit tests to stop the server
- with self._lock:
- self._stop_unlocked()
-
- def _stop_unlocked(self):
- if self._forkserver_pid is None:
- return
-
- # close the "alive" file descriptor asks the server to stop
- os.close(self._forkserver_alive_fd)
- self._forkserver_alive_fd = None
-
- os.waitpid(self._forkserver_pid, 0)
- self._forkserver_pid = None
-
- if not util.is_abstract_socket_namespace(self._forkserver_address):
- os.unlink(self._forkserver_address)
- self._forkserver_address = None
-
+ def _stop(self):
+ # Method used by unit tests to stop the server
+ with self._lock:
+ self._stop_unlocked()
+
+ def _stop_unlocked(self):
+ if self._forkserver_pid is None:
+ return
+
+ # close the "alive" file descriptor asks the server to stop
+ os.close(self._forkserver_alive_fd)
+ self._forkserver_alive_fd = None
+
+ os.waitpid(self._forkserver_pid, 0)
+ self._forkserver_pid = None
+
+ if not util.is_abstract_socket_namespace(self._forkserver_address):
+ os.unlink(self._forkserver_address)
+ self._forkserver_address = None
+
def set_forkserver_preload(self, modules_names):
'''Set list of module names to try to load in forkserver process.'''
if not all(type(mod) is str for mod in self._preload_modules):
@@ -89,7 +89,7 @@ class ForkServer(object):
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd,
- resource_tracker.getfd()]
+ resource_tracker.getfd()]
allfds += fds
try:
reduction.sendfds(client, allfds)
@@ -110,7 +110,7 @@ class ForkServer(object):
ensure_running() will do nothing.
'''
with self._lock:
- resource_tracker.ensure_running()
+ resource_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
@@ -136,8 +136,8 @@ class ForkServer(object):
with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX')
listener.bind(address)
- if not util.is_abstract_socket_namespace(address):
- os.chmod(address, 0o600)
+ if not util.is_abstract_socket_namespace(address):
+ os.chmod(address, 0o600)
listener.listen()
# all client processes own the write end of the "alive" pipe;
@@ -237,8 +237,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
break
child_w = pid_to_fd.pop(pid, None)
if child_w is not None:
- returncode = os.waitstatus_to_exitcode(sts)
-
+ returncode = os.waitstatus_to_exitcode(sts)
+
# Send exit code to client process
try:
write_signed(child_w, returncode)
@@ -305,12 +305,12 @@ def _serve_one(child_r, fds, unused_fds, handlers):
os.close(fd)
(_forkserver._forkserver_alive_fd,
- resource_tracker._resource_tracker._fd,
+ resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds
# Run process object received over pipe
- parent_sentinel = os.dup(child_r)
- code = spawn._main(child_r, parent_sentinel)
+ parent_sentinel = os.dup(child_r)
+ code = spawn._main(child_r, parent_sentinel)
return code
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/heap.py b/contrib/tools/python3/src/Lib/multiprocessing/heap.py
index 6217dfe126..909c1e1df9 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/heap.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/heap.py
@@ -8,7 +8,7 @@
#
import bisect
-from collections import defaultdict
+from collections import defaultdict
import mmap
import os
import sys
@@ -29,9 +29,9 @@ if sys.platform == 'win32':
import _winapi
class Arena(object):
- """
- A shared memory area backed by anonymous memory (Windows).
- """
+ """
+ A shared memory area backed by anonymous memory (Windows).
+ """
_rand = tempfile._RandomNameSequence()
@@ -56,7 +56,7 @@ if sys.platform == 'win32':
def __setstate__(self, state):
self.size, self.name = self._state = state
- # Reopen existing mmap
+ # Reopen existing mmap
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
# XXX Temporarily preventing buildbot failures while determining
# XXX the correct long-term fix. See issue 23060
@@ -65,10 +65,10 @@ if sys.platform == 'win32':
else:
class Arena(object):
- """
- A shared memory area backed by a temporary file (POSIX).
- """
-
+ """
+ A shared memory area backed by a temporary file (POSIX).
+ """
+
if sys.platform == 'linux':
_dir_candidates = ['/dev/shm']
else:
@@ -78,8 +78,8 @@ else:
self.size = size
self.fd = fd
if fd == -1:
- # Arena is created anew (if fd != -1, it means we're coming
- # from rebuild_arena() below)
+ # Arena is created anew (if fd != -1, it means we're coming
+ # from rebuild_arena() below)
self.fd, name = tempfile.mkstemp(
prefix='pym-%d-'%os.getpid(),
dir=self._choose_dir(size))
@@ -114,82 +114,82 @@ else:
class Heap(object):
- # Minimum malloc() alignment
+ # Minimum malloc() alignment
_alignment = 8
- _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
- _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
-
+ _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
+ _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
+
def __init__(self, size=mmap.PAGESIZE):
self._lastpid = os.getpid()
self._lock = threading.Lock()
- # Current arena allocation size
+ # Current arena allocation size
self._size = size
- # A sorted list of available block sizes in arenas
+ # A sorted list of available block sizes in arenas
self._lengths = []
-
- # Free block management:
- # - map each block size to a list of `(Arena, start, stop)` blocks
+
+ # Free block management:
+ # - map each block size to a list of `(Arena, start, stop)` blocks
self._len_to_seq = {}
- # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
- # starting at that offset
+ # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
+ # starting at that offset
self._start_to_block = {}
- # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
- # ending at that offset
+ # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
+ # ending at that offset
self._stop_to_block = {}
-
- # Map arenas to their `(Arena, start, stop)` blocks in use
- self._allocated_blocks = defaultdict(set)
+
+ # Map arenas to their `(Arena, start, stop)` blocks in use
+ self._allocated_blocks = defaultdict(set)
self._arenas = []
-
- # List of pending blocks to free - see comment in free() below
+
+ # List of pending blocks to free - see comment in free() below
self._pending_free_blocks = []
- # Statistics
- self._n_mallocs = 0
- self._n_frees = 0
-
+ # Statistics
+ self._n_mallocs = 0
+ self._n_frees = 0
+
@staticmethod
def _roundup(n, alignment):
# alignment must be a power of 2
mask = alignment - 1
return (n + mask) & ~mask
- def _new_arena(self, size):
- # Create a new arena with at least the given *size*
- length = self._roundup(max(self._size, size), mmap.PAGESIZE)
- # We carve larger and larger arenas, for efficiency, until we
- # reach a large-ish size (roughly L3 cache-sized)
- if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
- self._size *= 2
- util.info('allocating a new mmap of length %d', length)
- arena = Arena(length)
- self._arenas.append(arena)
- return (arena, 0, length)
-
- def _discard_arena(self, arena):
- # Possibly delete the given (unused) arena
- length = arena.size
- # Reusing an existing arena is faster than creating a new one, so
- # we only reclaim space if it's large enough.
- if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
- return
- blocks = self._allocated_blocks.pop(arena)
- assert not blocks
- del self._start_to_block[(arena, 0)]
- del self._stop_to_block[(arena, length)]
- self._arenas.remove(arena)
- seq = self._len_to_seq[length]
- seq.remove((arena, 0, length))
- if not seq:
- del self._len_to_seq[length]
- self._lengths.remove(length)
-
+ def _new_arena(self, size):
+ # Create a new arena with at least the given *size*
+ length = self._roundup(max(self._size, size), mmap.PAGESIZE)
+ # We carve larger and larger arenas, for efficiency, until we
+ # reach a large-ish size (roughly L3 cache-sized)
+ if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
+ self._size *= 2
+ util.info('allocating a new mmap of length %d', length)
+ arena = Arena(length)
+ self._arenas.append(arena)
+ return (arena, 0, length)
+
+ def _discard_arena(self, arena):
+ # Possibly delete the given (unused) arena
+ length = arena.size
+ # Reusing an existing arena is faster than creating a new one, so
+ # we only reclaim space if it's large enough.
+ if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
+ return
+ blocks = self._allocated_blocks.pop(arena)
+ assert not blocks
+ del self._start_to_block[(arena, 0)]
+ del self._stop_to_block[(arena, length)]
+ self._arenas.remove(arena)
+ seq = self._len_to_seq[length]
+ seq.remove((arena, 0, length))
+ if not seq:
+ del self._len_to_seq[length]
+ self._lengths.remove(length)
+
def _malloc(self, size):
# returns a large enough block -- it might be much larger
i = bisect.bisect_left(self._lengths, size)
if i == len(self._lengths):
- return self._new_arena(size)
+ return self._new_arena(size)
else:
length = self._lengths[i]
seq = self._len_to_seq[length]
@@ -202,8 +202,8 @@ class Heap(object):
del self._stop_to_block[(arena, stop)]
return block
- def _add_free_block(self, block):
- # make block available and try to merge with its neighbours in the arena
+ def _add_free_block(self, block):
+ # make block available and try to merge with its neighbours in the arena
(arena, start, stop) = block
try:
@@ -247,14 +247,14 @@ class Heap(object):
return start, stop
- def _remove_allocated_block(self, block):
- arena, start, stop = block
- blocks = self._allocated_blocks[arena]
- blocks.remove((start, stop))
- if not blocks:
- # Arena is entirely free, discard it from this process
- self._discard_arena(arena)
-
+ def _remove_allocated_block(self, block):
+ arena, start, stop = block
+ blocks = self._allocated_blocks[arena]
+ blocks.remove((start, stop))
+ if not blocks:
+ # Arena is entirely free, discard it from this process
+ self._discard_arena(arena)
+
def _free_pending_blocks(self):
# Free all the blocks in the pending list - called with the lock held.
while True:
@@ -262,8 +262,8 @@ class Heap(object):
block = self._pending_free_blocks.pop()
except IndexError:
break
- self._add_free_block(block)
- self._remove_allocated_block(block)
+ self._add_free_block(block)
+ self._remove_allocated_block(block)
def free(self, block):
# free a block returned by malloc()
@@ -274,7 +274,7 @@ class Heap(object):
# immediately, the block is added to a list of blocks to be freed
# synchronously sometimes later from malloc() or free(), by calling
# _free_pending_blocks() (appending and retrieving from a list is not
- # strictly thread-safe but under CPython it's atomic thanks to the GIL).
+ # strictly thread-safe but under CPython it's atomic thanks to the GIL).
if os.getpid() != self._lastpid:
raise ValueError(
"My pid ({0:n}) is not last pid {1:n}".format(
@@ -286,10 +286,10 @@ class Heap(object):
else:
# we hold the lock
try:
- self._n_frees += 1
+ self._n_frees += 1
self._free_pending_blocks()
- self._add_free_block(block)
- self._remove_allocated_block(block)
+ self._add_free_block(block)
+ self._remove_allocated_block(block)
finally:
self._lock.release()
@@ -302,21 +302,21 @@ class Heap(object):
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
with self._lock:
- self._n_mallocs += 1
- # allow pending blocks to be marked available
+ self._n_mallocs += 1
+ # allow pending blocks to be marked available
self._free_pending_blocks()
- size = self._roundup(max(size, 1), self._alignment)
+ size = self._roundup(max(size, 1), self._alignment)
(arena, start, stop) = self._malloc(size)
- real_stop = start + size
- if real_stop < stop:
- # if the returned block is larger than necessary, mark
- # the remainder available
- self._add_free_block((arena, real_stop, stop))
- self._allocated_blocks[arena].add((start, real_stop))
- return (arena, start, real_stop)
+ real_stop = start + size
+ if real_stop < stop:
+ # if the returned block is larger than necessary, mark
+ # the remainder available
+ self._add_free_block((arena, real_stop, stop))
+ self._allocated_blocks[arena].add((start, real_stop))
+ return (arena, start, real_stop)
#
-# Class wrapping a block allocated out of a Heap -- can be inherited by child process
+# Class wrapping a block allocated out of a Heap -- can be inherited by child process
#
class BufferWrapper(object):
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/managers.py b/contrib/tools/python3/src/Lib/multiprocessing/managers.py
index dfa566c6fc..047ea27672 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/managers.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/managers.py
@@ -1,5 +1,5 @@
#
-# Module providing manager classes for dealing
+# Module providing manager classes for dealing
# with shared objects
#
# multiprocessing/managers.py
@@ -8,7 +8,7 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
+__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
#
# Imports
@@ -16,13 +16,13 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
import sys
import threading
-import signal
+import signal
import array
import queue
import time
-import types
-import os
-from os import getpid
+import types
+import os
+from os import getpid
from traceback import format_exc
@@ -32,13 +32,13 @@ from . import pool
from . import process
from . import util
from . import get_context
-try:
- from . import shared_memory
-except ImportError:
- HAS_SHMEM = False
-else:
- HAS_SHMEM = True
- __all__.append('SharedMemoryManager')
+try:
+ from . import shared_memory
+except ImportError:
+ HAS_SHMEM = False
+else:
+ HAS_SHMEM = True
+ __all__.append('SharedMemoryManager')
#
# Register some things for pickling
@@ -61,7 +61,7 @@ if view_types[0] is not list: # only needed in Py3.0
class Token(object):
'''
- Type to uniquely identify a shared object
+ Type to uniquely identify a shared object
'''
__slots__ = ('typeid', 'address', 'id')
@@ -250,7 +250,7 @@ class Server(object):
try:
obj, exposed, gettypeid = \
self.id_to_local_proxy_obj[ident]
- except KeyError:
+ except KeyError:
raise ke
if methodname not in exposed:
@@ -298,7 +298,7 @@ class Server(object):
try:
try:
send(msg)
- except Exception:
+ except Exception:
send(('#UNSERIALIZABLE', format_exc()))
except Exception as e:
util.info('exception in thread serving %r',
@@ -362,7 +362,7 @@ class Server(object):
finally:
self.stop_event.set()
- def create(self, c, typeid, /, *args, **kwds):
+ def create(self, c, typeid, /, *args, **kwds):
'''
Create a new shared object and return its id
'''
@@ -573,9 +573,9 @@ class BaseManager(object):
'''
Create a server, report its address and run it
'''
- # bpo-36368: protect server process from KeyboardInterrupt signals
- signal.signal(signal.SIGINT, signal.SIG_IGN)
-
+ # bpo-36368: protect server process from KeyboardInterrupt signals
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
if initializer is not None:
initializer(*initargs)
@@ -590,7 +590,7 @@ class BaseManager(object):
util.info('manager serving at %r', server.address)
server.serve_forever()
- def _create(self, typeid, /, *args, **kwds):
+ def _create(self, typeid, /, *args, **kwds):
'''
Create a new shared object; return the token and exposed tuple
'''
@@ -710,7 +710,7 @@ class BaseManager(object):
)
if create_method:
- def temp(self, /, *args, **kwds):
+ def temp(self, /, *args, **kwds):
util.debug('requesting creation of a shared %r object', typeid)
token, exp = self._create(typeid, *args, **kwds)
proxy = proxytype(
@@ -796,7 +796,7 @@ class BaseProxy(object):
def _callmethod(self, methodname, args=(), kwds={}):
'''
- Try to call a method of the referent and return a copy of the result
+ Try to call a method of the referent and return a copy of the result
'''
try:
conn = self._tls.connection
@@ -950,7 +950,7 @@ def MakeProxyType(name, exposed, _cache={}):
dic = {}
for meth in exposed:
- exec('''def %s(self, /, *args, **kwds):
+ exec('''def %s(self, /, *args, **kwds):
return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
ProxyType = type(name, (BaseProxy,), dic)
@@ -989,7 +989,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
#
class Namespace(object):
- def __init__(self, /, **kwds):
+ def __init__(self, /, **kwds):
self.__dict__.update(kwds)
def __repr__(self):
items = list(self.__dict__.items())
@@ -1131,9 +1131,9 @@ class ValueProxy(BaseProxy):
return self._callmethod('set', (value,))
value = property(get, set)
- __class_getitem__ = classmethod(types.GenericAlias)
-
+ __class_getitem__ = classmethod(types.GenericAlias)
+
BaseListProxy = MakeProxyType('BaseListProxy', (
'__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
'__mul__', '__reversed__', '__rmul__', '__setitem__',
@@ -1216,155 +1216,155 @@ SyncManager.register('Namespace', Namespace, NamespaceProxy)
# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)
-
-#
-# Definition of SharedMemoryManager and SharedMemoryServer
-#
-
-if HAS_SHMEM:
- class _SharedMemoryTracker:
- "Manages one or more shared memory segments."
-
- def __init__(self, name, segment_names=[]):
- self.shared_memory_context_name = name
- self.segment_names = segment_names
-
- def register_segment(self, segment_name):
- "Adds the supplied shared memory block name to tracker."
- util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
- self.segment_names.append(segment_name)
-
- def destroy_segment(self, segment_name):
- """Calls unlink() on the shared memory block with the supplied name
- and removes it from the list of blocks being tracked."""
- util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
- self.segment_names.remove(segment_name)
- segment = shared_memory.SharedMemory(segment_name)
- segment.close()
- segment.unlink()
-
- def unlink(self):
- "Calls destroy_segment() on all tracked shared memory blocks."
- for segment_name in self.segment_names[:]:
- self.destroy_segment(segment_name)
-
- def __del__(self):
- util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
- self.unlink()
-
- def __getstate__(self):
- return (self.shared_memory_context_name, self.segment_names)
-
- def __setstate__(self, state):
- self.__init__(*state)
-
-
- class SharedMemoryServer(Server):
-
- public = Server.public + \
- ['track_segment', 'release_segment', 'list_segments']
-
- def __init__(self, *args, **kwargs):
- Server.__init__(self, *args, **kwargs)
- address = self.address
- # The address of Linux abstract namespaces can be bytes
- if isinstance(address, bytes):
- address = os.fsdecode(address)
- self.shared_memory_context = \
- _SharedMemoryTracker(f"shm_{address}_{getpid()}")
- util.debug(f"SharedMemoryServer started by pid {getpid()}")
-
- def create(self, c, typeid, /, *args, **kwargs):
- """Create a new distributed-shared object (not backed by a shared
- memory block) and return its id to be used in a Proxy Object."""
- # Unless set up as a shared proxy, don't make shared_memory_context
- # a standard part of kwargs. This makes things easier for supplying
- # simple functions.
- if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
- kwargs['shared_memory_context'] = self.shared_memory_context
- return Server.create(self, c, typeid, *args, **kwargs)
-
- def shutdown(self, c):
- "Call unlink() on all tracked shared memory, terminate the Server."
- self.shared_memory_context.unlink()
- return Server.shutdown(self, c)
-
- def track_segment(self, c, segment_name):
- "Adds the supplied shared memory block name to Server's tracker."
- self.shared_memory_context.register_segment(segment_name)
-
- def release_segment(self, c, segment_name):
- """Calls unlink() on the shared memory block with the supplied name
- and removes it from the tracker instance inside the Server."""
- self.shared_memory_context.destroy_segment(segment_name)
-
- def list_segments(self, c):
- """Returns a list of names of shared memory blocks that the Server
- is currently tracking."""
- return self.shared_memory_context.segment_names
-
-
- class SharedMemoryManager(BaseManager):
- """Like SyncManager but uses SharedMemoryServer instead of Server.
-
- It provides methods for creating and returning SharedMemory instances
- and for creating a list-like object (ShareableList) backed by shared
- memory. It also provides methods that create and return Proxy Objects
- that support synchronization across processes (i.e. multi-process-safe
- locks and semaphores).
- """
-
- _Server = SharedMemoryServer
-
- def __init__(self, *args, **kwargs):
- if os.name == "posix":
- # bpo-36867: Ensure the resource_tracker is running before
- # launching the manager process, so that concurrent
- # shared_memory manipulation both in the manager and in the
- # current process does not create two resource_tracker
- # processes.
- from . import resource_tracker
- resource_tracker.ensure_running()
- BaseManager.__init__(self, *args, **kwargs)
- util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
-
- def __del__(self):
- util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
- pass
-
- def get_server(self):
- 'Better than monkeypatching for now; merge into Server ultimately'
- if self._state.value != State.INITIAL:
- if self._state.value == State.STARTED:
- raise ProcessError("Already started SharedMemoryServer")
- elif self._state.value == State.SHUTDOWN:
- raise ProcessError("SharedMemoryManager has shut down")
- else:
- raise ProcessError(
- "Unknown state {!r}".format(self._state.value))
- return self._Server(self._registry, self._address,
- self._authkey, self._serializer)
-
- def SharedMemory(self, size):
- """Returns a new SharedMemory instance with the specified size in
- bytes, to be tracked by the manager."""
- with self._Client(self._address, authkey=self._authkey) as conn:
- sms = shared_memory.SharedMemory(None, create=True, size=size)
- try:
- dispatch(conn, None, 'track_segment', (sms.name,))
- except BaseException as e:
- sms.unlink()
- raise e
- return sms
-
- def ShareableList(self, sequence):
- """Returns a new ShareableList instance populated with the values
- from the input sequence, to be tracked by the manager."""
- with self._Client(self._address, authkey=self._authkey) as conn:
- sl = shared_memory.ShareableList(sequence)
- try:
- dispatch(conn, None, 'track_segment', (sl.shm.name,))
- except BaseException as e:
- sl.shm.unlink()
- raise e
- return sl
+
+#
+# Definition of SharedMemoryManager and SharedMemoryServer
+#
+
+if HAS_SHMEM:
+ class _SharedMemoryTracker:
+ "Manages one or more shared memory segments."
+
+ def __init__(self, name, segment_names=[]):
+ self.shared_memory_context_name = name
+ self.segment_names = segment_names
+
+ def register_segment(self, segment_name):
+ "Adds the supplied shared memory block name to tracker."
+ util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.append(segment_name)
+
+ def destroy_segment(self, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the list of blocks being tracked."""
+ util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
+ self.segment_names.remove(segment_name)
+ segment = shared_memory.SharedMemory(segment_name)
+ segment.close()
+ segment.unlink()
+
+ def unlink(self):
+ "Calls destroy_segment() on all tracked shared memory blocks."
+ for segment_name in self.segment_names[:]:
+ self.destroy_segment(segment_name)
+
+ def __del__(self):
+ util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
+ self.unlink()
+
+ def __getstate__(self):
+ return (self.shared_memory_context_name, self.segment_names)
+
+ def __setstate__(self, state):
+ self.__init__(*state)
+
+
+ class SharedMemoryServer(Server):
+
+ public = Server.public + \
+ ['track_segment', 'release_segment', 'list_segments']
+
+ def __init__(self, *args, **kwargs):
+ Server.__init__(self, *args, **kwargs)
+ address = self.address
+ # The address of Linux abstract namespaces can be bytes
+ if isinstance(address, bytes):
+ address = os.fsdecode(address)
+ self.shared_memory_context = \
+ _SharedMemoryTracker(f"shm_{address}_{getpid()}")
+ util.debug(f"SharedMemoryServer started by pid {getpid()}")
+
+ def create(self, c, typeid, /, *args, **kwargs):
+ """Create a new distributed-shared object (not backed by a shared
+ memory block) and return its id to be used in a Proxy Object."""
+ # Unless set up as a shared proxy, don't make shared_memory_context
+ # a standard part of kwargs. This makes things easier for supplying
+ # simple functions.
+ if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
+ kwargs['shared_memory_context'] = self.shared_memory_context
+ return Server.create(self, c, typeid, *args, **kwargs)
+
+ def shutdown(self, c):
+ "Call unlink() on all tracked shared memory, terminate the Server."
+ self.shared_memory_context.unlink()
+ return Server.shutdown(self, c)
+
+ def track_segment(self, c, segment_name):
+ "Adds the supplied shared memory block name to Server's tracker."
+ self.shared_memory_context.register_segment(segment_name)
+
+ def release_segment(self, c, segment_name):
+ """Calls unlink() on the shared memory block with the supplied name
+ and removes it from the tracker instance inside the Server."""
+ self.shared_memory_context.destroy_segment(segment_name)
+
+ def list_segments(self, c):
+ """Returns a list of names of shared memory blocks that the Server
+ is currently tracking."""
+ return self.shared_memory_context.segment_names
+
+
+ class SharedMemoryManager(BaseManager):
+ """Like SyncManager but uses SharedMemoryServer instead of Server.
+
+ It provides methods for creating and returning SharedMemory instances
+ and for creating a list-like object (ShareableList) backed by shared
+ memory. It also provides methods that create and return Proxy Objects
+ that support synchronization across processes (i.e. multi-process-safe
+ locks and semaphores).
+ """
+
+ _Server = SharedMemoryServer
+
+ def __init__(self, *args, **kwargs):
+ if os.name == "posix":
+ # bpo-36867: Ensure the resource_tracker is running before
+ # launching the manager process, so that concurrent
+ # shared_memory manipulation both in the manager and in the
+ # current process does not create two resource_tracker
+ # processes.
+ from . import resource_tracker
+ resource_tracker.ensure_running()
+ BaseManager.__init__(self, *args, **kwargs)
+ util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
+
+ def __del__(self):
+ util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
+ pass
+
+ def get_server(self):
+ 'Better than monkeypatching for now; merge into Server ultimately'
+ if self._state.value != State.INITIAL:
+ if self._state.value == State.STARTED:
+ raise ProcessError("Already started SharedMemoryServer")
+ elif self._state.value == State.SHUTDOWN:
+ raise ProcessError("SharedMemoryManager has shut down")
+ else:
+ raise ProcessError(
+ "Unknown state {!r}".format(self._state.value))
+ return self._Server(self._registry, self._address,
+ self._authkey, self._serializer)
+
+ def SharedMemory(self, size):
+ """Returns a new SharedMemory instance with the specified size in
+ bytes, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sms = shared_memory.SharedMemory(None, create=True, size=size)
+ try:
+ dispatch(conn, None, 'track_segment', (sms.name,))
+ except BaseException as e:
+ sms.unlink()
+ raise e
+ return sms
+
+ def ShareableList(self, sequence):
+ """Returns a new ShareableList instance populated with the values
+ from the input sequence, to be tracked by the manager."""
+ with self._Client(self._address, authkey=self._authkey) as conn:
+ sl = shared_memory.ShareableList(sequence)
+ try:
+ dispatch(conn, None, 'track_segment', (sl.shm.name,))
+ except BaseException as e:
+ sl.shm.unlink()
+ raise e
+ return sl
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/pool.py b/contrib/tools/python3/src/Lib/multiprocessing/pool.py
index bbe05a550c..5ae69156f7 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/pool.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/pool.py
@@ -13,30 +13,30 @@ __all__ = ['Pool', 'ThreadPool']
# Imports
#
-import collections
+import collections
import itertools
import os
-import queue
-import threading
+import queue
+import threading
import time
import traceback
-import types
-import warnings
+import types
+import warnings
# If threading is available then ThreadPool should be provided. Therefore
# we avoid top-level imports which are liable to fail on some systems.
from . import util
from . import get_context, TimeoutError
-from .connection import wait
+from .connection import wait
#
# Constants representing the state of a pool
#
-INIT = "INIT"
-RUN = "RUN"
-CLOSE = "CLOSE"
-TERMINATE = "TERMINATE"
+INIT = "INIT"
+RUN = "RUN"
+CLOSE = "CLOSE"
+TERMINATE = "TERMINATE"
#
# Miscellaneous
@@ -147,54 +147,54 @@ def _helper_reraises_exception(ex):
# Class representing a process pool
#
-class _PoolCache(dict):
- """
- Class that implements a cache for the Pool class that will notify
- the pool management threads every time the cache is emptied. The
- notification is done by the use of a queue that is provided when
- instantiating the cache.
- """
- def __init__(self, /, *args, notifier=None, **kwds):
- self.notifier = notifier
- super().__init__(*args, **kwds)
-
- def __delitem__(self, item):
- super().__delitem__(item)
-
- # Notify that the cache is empty. This is important because the
- # pool keeps maintaining workers until the cache gets drained. This
- # eliminates a race condition in which a task is finished after the
- # the pool's _handle_workers method has enter another iteration of the
- # loop. In this situation, the only event that can wake up the pool
- # is the cache to be emptied (no more tasks available).
- if not self:
- self.notifier.put(None)
-
+class _PoolCache(dict):
+ """
+ Class that implements a cache for the Pool class that will notify
+ the pool management threads every time the cache is emptied. The
+ notification is done by the use of a queue that is provided when
+ instantiating the cache.
+ """
+ def __init__(self, /, *args, notifier=None, **kwds):
+ self.notifier = notifier
+ super().__init__(*args, **kwds)
+
+ def __delitem__(self, item):
+ super().__delitem__(item)
+
+ # Notify that the cache is empty. This is important because the
+ # pool keeps maintaining workers until the cache gets drained. This
+ # eliminates a race condition in which a task is finished after the
+ # the pool's _handle_workers method has enter another iteration of the
+ # loop. In this situation, the only event that can wake up the pool
+ # is the cache to be emptied (no more tasks available).
+ if not self:
+ self.notifier.put(None)
+
class Pool(object):
'''
Class which supports an async version of applying functions to arguments.
'''
_wrap_exception = True
- @staticmethod
- def Process(ctx, *args, **kwds):
- return ctx.Process(*args, **kwds)
+ @staticmethod
+ def Process(ctx, *args, **kwds):
+ return ctx.Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
- # Attributes initialized early to make sure that they exist in
- # __del__() if __init__() raises an exception
- self._pool = []
- self._state = INIT
-
+ # Attributes initialized early to make sure that they exist in
+ # __del__() if __init__() raises an exception
+ self._pool = []
+ self._state = INIT
+
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
- # The _change_notifier queue exist to wake up self._handle_workers()
- # when the cache (self._cache) is empty or when there is a change in
- # the _state variable of the thread that runs _handle_workers.
- self._change_notifier = self._ctx.SimpleQueue()
- self._cache = _PoolCache(notifier=self._change_notifier)
+ # The _change_notifier queue exist to wake up self._handle_workers()
+ # when the cache (self._cache) is empty or when there is a change in
+ # the _state variable of the thread that runs _handle_workers.
+ self._change_notifier = self._ctx.SimpleQueue()
+ self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs
@@ -208,24 +208,24 @@ class Pool(object):
raise TypeError('initializer must be a callable')
self._processes = processes
- try:
- self._repopulate_pool()
- except Exception:
- for p in self._pool:
- if p.exitcode is None:
- p.terminate()
- for p in self._pool:
- p.join()
- raise
-
- sentinels = self._get_sentinels()
-
+ try:
+ self._repopulate_pool()
+ except Exception:
+ for p in self._pool:
+ if p.exitcode is None:
+ p.terminate()
+ for p in self._pool:
+ p.join()
+ raise
+
+ sentinels = self._get_sentinels()
+
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
- args=(self._cache, self._taskqueue, self._ctx, self.Process,
- self._processes, self._pool, self._inqueue, self._outqueue,
- self._initializer, self._initargs, self._maxtasksperchild,
- self._wrap_exception, sentinels, self._change_notifier)
+ args=(self._cache, self._taskqueue, self._ctx, self.Process,
+ self._processes, self._pool, self._inqueue, self._outqueue,
+ self._initializer, self._initargs, self._maxtasksperchild,
+ self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
@@ -252,92 +252,92 @@ class Pool(object):
self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
- self._change_notifier, self._worker_handler, self._task_handler,
+ self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
- self._state = RUN
-
- # Copy globals as function locals to make sure that they are available
- # during Python shutdown when the Pool is destroyed.
- def __del__(self, _warn=warnings.warn, RUN=RUN):
- if self._state == RUN:
- _warn(f"unclosed running multiprocessing pool {self!r}",
- ResourceWarning, source=self)
- if getattr(self, '_change_notifier', None) is not None:
- self._change_notifier.put(None)
-
- def __repr__(self):
- cls = self.__class__
- return (f'<{cls.__module__}.{cls.__qualname__} '
- f'state={self._state} '
- f'pool_size={len(self._pool)}>')
-
- def _get_sentinels(self):
- task_queue_sentinels = [self._outqueue._reader]
- self_notifier_sentinels = [self._change_notifier._reader]
- return [*task_queue_sentinels, *self_notifier_sentinels]
-
- @staticmethod
- def _get_worker_sentinels(workers):
- return [worker.sentinel for worker in
- workers if hasattr(worker, "sentinel")]
-
- @staticmethod
- def _join_exited_workers(pool):
+ self._state = RUN
+
+ # Copy globals as function locals to make sure that they are available
+ # during Python shutdown when the Pool is destroyed.
+ def __del__(self, _warn=warnings.warn, RUN=RUN):
+ if self._state == RUN:
+ _warn(f"unclosed running multiprocessing pool {self!r}",
+ ResourceWarning, source=self)
+ if getattr(self, '_change_notifier', None) is not None:
+ self._change_notifier.put(None)
+
+ def __repr__(self):
+ cls = self.__class__
+ return (f'<{cls.__module__}.{cls.__qualname__} '
+ f'state={self._state} '
+ f'pool_size={len(self._pool)}>')
+
+ def _get_sentinels(self):
+ task_queue_sentinels = [self._outqueue._reader]
+ self_notifier_sentinels = [self._change_notifier._reader]
+ return [*task_queue_sentinels, *self_notifier_sentinels]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return [worker.sentinel for worker in
+ workers if hasattr(worker, "sentinel")]
+
+ @staticmethod
+ def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
- for i in reversed(range(len(pool))):
- worker = pool[i]
+ for i in reversed(range(len(pool))):
+ worker = pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
- del pool[i]
+ del pool[i]
return cleaned
def _repopulate_pool(self):
- return self._repopulate_pool_static(self._ctx, self.Process,
- self._processes,
- self._pool, self._inqueue,
- self._outqueue, self._initializer,
- self._initargs,
- self._maxtasksperchild,
- self._wrap_exception)
-
- @staticmethod
- def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception):
+ return self._repopulate_pool_static(self._ctx, self.Process,
+ self._processes,
+ self._pool, self._inqueue,
+ self._outqueue, self._initializer,
+ self._initargs,
+ self._maxtasksperchild,
+ self._wrap_exception)
+
+ @staticmethod
+ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
- for i in range(processes - len(pool)):
- w = Process(ctx, target=worker,
- args=(inqueue, outqueue,
- initializer,
- initargs, maxtasksperchild,
- wrap_exception))
+ for i in range(processes - len(pool)):
+ w = Process(ctx, target=worker,
+ args=(inqueue, outqueue,
+ initializer,
+ initargs, maxtasksperchild,
+ wrap_exception))
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
- pool.append(w)
+ pool.append(w)
util.debug('added worker')
- @staticmethod
- def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
- initializer, initargs, maxtasksperchild,
- wrap_exception):
+ @staticmethod
+ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild,
+ wrap_exception):
"""Clean up any exited workers and start replacements for them.
"""
- if Pool._join_exited_workers(pool):
- Pool._repopulate_pool_static(ctx, Process, processes, pool,
- inqueue, outqueue, initializer,
- initargs, maxtasksperchild,
- wrap_exception)
+ if Pool._join_exited_workers(pool):
+ Pool._repopulate_pool_static(ctx, Process, processes, pool,
+ inqueue, outqueue, initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
@@ -345,10 +345,10 @@ class Pool(object):
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
- def _check_running(self):
- if self._state != RUN:
- raise ValueError("Pool not running")
-
+ def _check_running(self):
+ if self._state != RUN:
+ raise ValueError("Pool not running")
+
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
@@ -394,9 +394,9 @@ class Pool(object):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
- self._check_running()
+ self._check_running()
if chunksize == 1:
- result = IMapIterator(self)
+ result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
@@ -409,7 +409,7 @@ class Pool(object):
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = IMapIterator(self)
+ result = IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
@@ -423,9 +423,9 @@ class Pool(object):
'''
Like `imap()` method but ordering of results is arbitrary.
'''
- self._check_running()
+ self._check_running()
if chunksize == 1:
- result = IMapUnorderedIterator(self)
+ result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job, func, iterable),
@@ -437,7 +437,7 @@ class Pool(object):
raise ValueError(
"Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = IMapUnorderedIterator(self)
+ result = IMapUnorderedIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
@@ -452,8 +452,8 @@ class Pool(object):
'''
Asynchronous version of `apply()` method.
'''
- self._check_running()
- result = ApplyResult(self, callback, error_callback)
+ self._check_running()
+ result = ApplyResult(self, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
@@ -470,7 +470,7 @@ class Pool(object):
'''
Helper function to implement map, starmap and their async counterparts.
'''
- self._check_running()
+ self._check_running()
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
@@ -482,7 +482,7 @@ class Pool(object):
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
- result = MapResult(self, chunksize, len(iterable), callback,
+ result = MapResult(self, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put(
(
@@ -495,30 +495,30 @@ class Pool(object):
return result
@staticmethod
- def _wait_for_updates(sentinels, change_notifier, timeout=None):
- wait(sentinels, timeout=timeout)
- while not change_notifier.empty():
- change_notifier.get()
-
- @classmethod
- def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
- pool, inqueue, outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception, sentinels,
- change_notifier):
+ def _wait_for_updates(sentinels, change_notifier, timeout=None):
+ wait(sentinels, timeout=timeout)
+ while not change_notifier.empty():
+ change_notifier.get()
+
+ @classmethod
+ def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
+ pool, inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception, sentinels,
+ change_notifier):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
- while thread._state == RUN or (cache and thread._state != TERMINATE):
- cls._maintain_pool(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception)
-
- current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
-
- cls._wait_for_updates(current_sentinels, change_notifier)
+ while thread._state == RUN or (cache and thread._state != TERMINATE):
+ cls._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
+
+ current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
+
+ cls._wait_for_updates(current_sentinels, change_notifier)
# send sentinel to stop workers
- taskqueue.put(None)
+ taskqueue.put(None)
util.debug('worker handler exiting')
@staticmethod
@@ -530,7 +530,7 @@ class Pool(object):
try:
# iterating taskseq cannot fail
for task in taskseq:
- if thread._state != RUN:
+ if thread._state != RUN:
util.debug('task handler found thread._state != RUN')
break
try:
@@ -578,7 +578,7 @@ class Pool(object):
util.debug('result handler got EOFError/OSError -- exiting')
return
- if thread._state != RUN:
+ if thread._state != RUN:
assert thread._state == TERMINATE, "Thread not in TERMINATE"
util.debug('result handler found thread._state=TERMINATE')
break
@@ -646,7 +646,7 @@ class Pool(object):
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
- self._change_notifier.put(None)
+ self._change_notifier.put(None)
def terminate(self):
util.debug('terminating pool')
@@ -675,17 +675,17 @@ class Pool(object):
time.sleep(0)
@classmethod
- def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
+ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
util.debug('finalizing pool')
- # Notify that the worker_handler state has been changed so the
- # _handle_workers loop can be unblocked (and exited) in order to
- # send the finalization sentinel all the workers.
+ # Notify that the worker_handler state has been changed so the
+ # _handle_workers loop can be unblocked (and exited) in order to
+ # send the finalization sentinel all the workers.
worker_handler._state = TERMINATE
- change_notifier.put(None)
-
+ change_notifier.put(None)
+
task_handler._state = TERMINATE
util.debug('helping task handler/workers to finish')
@@ -696,7 +696,7 @@ class Pool(object):
"Cannot have cache with result_hander not alive")
result_handler._state = TERMINATE
- change_notifier.put(None)
+ change_notifier.put(None)
outqueue.put(None) # sentinel
# We must wait for the worker handler to exit before terminating
@@ -729,7 +729,7 @@ class Pool(object):
p.join()
def __enter__(self):
- self._check_running()
+ self._check_running()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
@@ -741,14 +741,14 @@ class Pool(object):
class ApplyResult(object):
- def __init__(self, pool, callback, error_callback):
- self._pool = pool
+ def __init__(self, pool, callback, error_callback):
+ self._pool = pool
self._event = threading.Event()
self._job = next(job_counter)
- self._cache = pool._cache
+ self._cache = pool._cache
self._callback = callback
self._error_callback = error_callback
- self._cache[self._job] = self
+ self._cache[self._job] = self
def ready(self):
return self._event.is_set()
@@ -778,10 +778,10 @@ class ApplyResult(object):
self._error_callback(self._value)
self._event.set()
del self._cache[self._job]
- self._pool = None
-
- __class_getitem__ = classmethod(types.GenericAlias)
+ self._pool = None
+ __class_getitem__ = classmethod(types.GenericAlias)
+
AsyncResult = ApplyResult # create alias -- see #17805
#
@@ -790,8 +790,8 @@ AsyncResult = ApplyResult # create alias -- see #17805
class MapResult(ApplyResult):
- def __init__(self, pool, chunksize, length, callback, error_callback):
- ApplyResult.__init__(self, pool, callback,
+ def __init__(self, pool, chunksize, length, callback, error_callback):
+ ApplyResult.__init__(self, pool, callback,
error_callback=error_callback)
self._success = True
self._value = [None] * length
@@ -799,7 +799,7 @@ class MapResult(ApplyResult):
if chunksize <= 0:
self._number_left = 0
self._event.set()
- del self._cache[self._job]
+ del self._cache[self._job]
else:
self._number_left = length//chunksize + bool(length % chunksize)
@@ -813,7 +813,7 @@ class MapResult(ApplyResult):
self._callback(self._value)
del self._cache[self._job]
self._event.set()
- self._pool = None
+ self._pool = None
else:
if not success and self._success:
# only store first exception
@@ -825,7 +825,7 @@ class MapResult(ApplyResult):
self._error_callback(self._value)
del self._cache[self._job]
self._event.set()
- self._pool = None
+ self._pool = None
#
# Class whose instances are returned by `Pool.imap()`
@@ -833,16 +833,16 @@ class MapResult(ApplyResult):
class IMapIterator(object):
- def __init__(self, pool):
- self._pool = pool
+ def __init__(self, pool):
+ self._pool = pool
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
- self._cache = pool._cache
+ self._cache = pool._cache
self._items = collections.deque()
self._index = 0
self._length = None
self._unsorted = {}
- self._cache[self._job] = self
+ self._cache[self._job] = self
def __iter__(self):
return self
@@ -853,14 +853,14 @@ class IMapIterator(object):
item = self._items.popleft()
except IndexError:
if self._index == self._length:
- self._pool = None
+ self._pool = None
raise StopIteration from None
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
- self._pool = None
+ self._pool = None
raise StopIteration from None
raise TimeoutError from None
@@ -886,7 +886,7 @@ class IMapIterator(object):
if self._index == self._length:
del self._cache[self._job]
- self._pool = None
+ self._pool = None
def _set_length(self, length):
with self._cond:
@@ -894,7 +894,7 @@ class IMapIterator(object):
if self._index == self._length:
self._cond.notify()
del self._cache[self._job]
- self._pool = None
+ self._pool = None
#
# Class whose instances are returned by `Pool.imap_unordered()`
@@ -909,7 +909,7 @@ class IMapUnorderedIterator(IMapIterator):
self._cond.notify()
if self._index == self._length:
del self._cache[self._job]
- self._pool = None
+ self._pool = None
#
#
@@ -919,7 +919,7 @@ class ThreadPool(Pool):
_wrap_exception = False
@staticmethod
- def Process(ctx, *args, **kwds):
+ def Process(ctx, *args, **kwds):
from .dummy import Process
return Process(*args, **kwds)
@@ -932,14 +932,14 @@ class ThreadPool(Pool):
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
- def _get_sentinels(self):
- return [self._change_notifier._reader]
-
- @staticmethod
- def _get_worker_sentinels(workers):
- return []
-
+ def _get_sentinels(self):
+ return [self._change_notifier._reader]
+
@staticmethod
+ def _get_worker_sentinels(workers):
+ return []
+
+ @staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# drain inqueue, and put sentinels at its head to make workers finish
try:
@@ -949,6 +949,6 @@ class ThreadPool(Pool):
pass
for i in range(size):
inqueue.put(None)
-
- def _wait_for_updates(self, sentinels, change_notifier, timeout):
- time.sleep(timeout)
+
+ def _wait_for_updates(self, sentinels, change_notifier, timeout):
+ time.sleep(timeout)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py
index 625981cf47..a3da92ef0a 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py
@@ -25,12 +25,12 @@ class Popen(object):
if self.returncode is None:
try:
pid, sts = os.waitpid(self.pid, flag)
- except OSError:
+ except OSError:
# Child process not yet created. See #1731717
# e.errno == errno.ECHILD == 10
return None
if pid == self.pid:
- self.returncode = os.waitstatus_to_exitcode(sts)
+ self.returncode = os.waitstatus_to_exitcode(sts)
return self.returncode
def wait(self, timeout=None):
@@ -62,20 +62,20 @@ class Popen(object):
def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
- child_r, parent_w = os.pipe()
+ child_r, parent_w = os.pipe()
self.pid = os.fork()
if self.pid == 0:
try:
os.close(parent_r)
- os.close(parent_w)
- code = process_obj._bootstrap(parent_sentinel=child_r)
+ os.close(parent_w)
+ code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
os._exit(code)
else:
os.close(child_w)
- os.close(child_r)
- self.finalizer = util.Finalize(self, util.close_fds,
- (parent_r, parent_w,))
+ os.close(child_r)
+ self.finalizer = util.Finalize(self, util.close_fds,
+ (parent_r, parent_w,))
self.sentinel = parent_r
def close(self):
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py
index a56eb9bf11..4cd65bad76 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py
@@ -49,11 +49,11 @@ class Popen(popen_fork.Popen):
set_spawning_popen(None)
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
- # Keep a duplicate of the data pipe's write end as a sentinel of the
- # parent process used by the child process.
- _parent_w = os.dup(w)
- self.finalizer = util.Finalize(self, util.close_fds,
- (_parent_w, self.sentinel))
+ # Keep a duplicate of the data pipe's write end as a sentinel of the
+ # parent process used by the child process.
+ _parent_w = os.dup(w)
+ self.finalizer = util.Finalize(self, util.close_fds,
+ (_parent_w, self.sentinel))
with open(w, 'wb', closefd=True) as f:
f.write(buf.getbuffer())
self.pid = forkserver.read_signed(self.sentinel)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py
index 24b8634523..898b689c38 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py
@@ -36,8 +36,8 @@ class Popen(popen_fork.Popen):
return fd
def _launch(self, process_obj):
- from . import resource_tracker
- tracker_fd = resource_tracker.getfd()
+ from . import resource_tracker
+ tracker_fd = resource_tracker.getfd()
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
@@ -61,12 +61,12 @@ class Popen(popen_fork.Popen):
with open(parent_w, 'wb', closefd=False) as f:
f.write(fp.getbuffer())
finally:
- fds_to_close = []
- for fd in (parent_r, parent_w):
- if fd is not None:
- fds_to_close.append(fd)
- self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)
-
- for fd in (child_r, child_w):
+ fds_to_close = []
+ for fd in (parent_r, parent_w):
if fd is not None:
+ fds_to_close.append(fd)
+ self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)
+
+ for fd in (child_r, child_w):
+ if fd is not None:
os.close(fd)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py
index 27fe064290..f866c59743 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py
@@ -22,7 +22,7 @@ WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
def _path_eq(p1, p2):
return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2)
-WINENV = not _path_eq(sys.executable, sys._base_executable)
+WINENV = not _path_eq(sys.executable, sys._base_executable)
def _close_handles(*handles):
@@ -44,12 +44,12 @@ class Popen(object):
def __init__(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
- # read end of pipe will be duplicated by the child process
+ # read end of pipe will be duplicated by the child process
# -- see spawn_main() in spawn.py.
- #
- # bpo-33929: Previously, the read end of pipe was "stolen" by the child
- # process, but it leaked a handle if the child process had been
- # terminated before it could steal the handle from the parent process.
+ #
+ # bpo-33929: Previously, the read end of pipe was "stolen" by the child
+ # process, but it leaked a handle if the child process had been
+ # terminated before it could steal the handle from the parent process.
rhandle, whandle = _winapi.CreatePipe(None, 0)
wfd = msvcrt.open_osfhandle(whandle, 0)
cmd = spawn.get_command_line(parent_pid=os.getpid(),
@@ -73,7 +73,7 @@ class Popen(object):
try:
hp, ht, pid, tid = _winapi.CreateProcess(
python_exe, cmd,
- None, None, False, 0, env, None, None)
+ None, None, False, 0, env, None, None)
_winapi.CloseHandle(ht)
except:
_winapi.CloseHandle(rhandle)
@@ -84,8 +84,8 @@ class Popen(object):
self.returncode = None
self._handle = hp
self.sentinel = int(hp)
- self.finalizer = util.Finalize(self, _close_handles,
- (self.sentinel, int(rhandle)))
+ self.finalizer = util.Finalize(self, _close_handles,
+ (self.sentinel, int(rhandle)))
# send information to child
set_spawning_popen(self)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/process.py b/contrib/tools/python3/src/Lib/multiprocessing/process.py
index 0b2e0b45b2..8177dcd7be 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/process.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/process.py
@@ -7,8 +7,8 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = ['BaseProcess', 'current_process', 'active_children',
- 'parent_process']
+__all__ = ['BaseProcess', 'current_process', 'active_children',
+ 'parent_process']
#
# Imports
@@ -47,13 +47,13 @@ def active_children():
_cleanup()
return list(_children)
-
-def parent_process():
- '''
- Return process object representing the parent process
- '''
- return _parent_process
-
+
+def parent_process():
+ '''
+ Return process object representing the parent process
+ '''
+ return _parent_process
+
#
#
#
@@ -84,7 +84,7 @@ class BaseProcess(object):
self._identity = _current_process._identity + (count,)
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
- self._parent_name = _current_process.name
+ self._parent_name = _current_process.name
self._popen = None
self._closed = False
self._target = target
@@ -257,7 +257,7 @@ class BaseProcess(object):
raise ValueError("process not started") from None
def __repr__(self):
- exitcode = None
+ exitcode = None
if self is _current_process:
status = 'started'
elif self._closed:
@@ -267,29 +267,29 @@ class BaseProcess(object):
elif self._popen is None:
status = 'initial'
else:
- exitcode = self._popen.poll()
- if exitcode is not None:
- status = 'stopped'
+ exitcode = self._popen.poll()
+ if exitcode is not None:
+ status = 'stopped'
else:
status = 'started'
- info = [type(self).__name__, 'name=%r' % self._name]
- if self._popen is not None:
- info.append('pid=%s' % self._popen.pid)
- info.append('parent=%s' % self._parent_pid)
- info.append(status)
- if exitcode is not None:
- exitcode = _exitcode_to_name.get(exitcode, exitcode)
- info.append('exitcode=%s' % exitcode)
- if self.daemon:
- info.append('daemon')
- return '<%s>' % ' '.join(info)
+ info = [type(self).__name__, 'name=%r' % self._name]
+ if self._popen is not None:
+ info.append('pid=%s' % self._popen.pid)
+ info.append('parent=%s' % self._parent_pid)
+ info.append(status)
+ if exitcode is not None:
+ exitcode = _exitcode_to_name.get(exitcode, exitcode)
+ info.append('exitcode=%s' % exitcode)
+ if self.daemon:
+ info.append('daemon')
+ return '<%s>' % ' '.join(info)
##
- def _bootstrap(self, parent_sentinel=None):
+ def _bootstrap(self, parent_sentinel=None):
from . import util, context
- global _current_process, _parent_process, _process_counter, _children
+ global _current_process, _parent_process, _process_counter, _children
try:
if self._start_method is not None:
@@ -299,10 +299,10 @@ class BaseProcess(object):
util._close_stdin()
old_process = _current_process
_current_process = self
- _parent_process = _ParentProcess(
- self._parent_name, self._parent_pid, parent_sentinel)
- if threading._HAVE_THREAD_NATIVE_ID:
- threading.main_thread()._set_native_id()
+ _parent_process = _ParentProcess(
+ self._parent_name, self._parent_pid, parent_sentinel)
+ if threading._HAVE_THREAD_NATIVE_ID:
+ threading.main_thread()._set_native_id()
try:
util._finalizer_registry.clear()
util._run_after_forkers()
@@ -317,12 +317,12 @@ class BaseProcess(object):
finally:
util._exit_function()
except SystemExit as e:
- if e.code is None:
- exitcode = 0
- elif isinstance(e.code, int):
- exitcode = e.code
+ if e.code is None:
+ exitcode = 0
+ elif isinstance(e.code, int):
+ exitcode = e.code
else:
- sys.stderr.write(str(e.code) + '\n')
+ sys.stderr.write(str(e.code) + '\n')
exitcode = 1
except:
exitcode = 1
@@ -350,41 +350,41 @@ class AuthenticationString(bytes):
)
return AuthenticationString, (bytes(self),)
-
-#
-# Create object representing the parent process
-#
-
-class _ParentProcess(BaseProcess):
-
- def __init__(self, name, pid, sentinel):
- self._identity = ()
- self._name = name
- self._pid = pid
- self._parent_pid = None
- self._popen = None
- self._closed = False
- self._sentinel = sentinel
- self._config = {}
-
- def is_alive(self):
- from multiprocessing.connection import wait
- return not wait([self._sentinel], timeout=0)
-
- @property
- def ident(self):
- return self._pid
-
- def join(self, timeout=None):
- '''
- Wait until parent process terminates
- '''
- from multiprocessing.connection import wait
- wait([self._sentinel], timeout=timeout)
-
- pid = ident
-
+
#
+# Create object representing the parent process
+#
+
+class _ParentProcess(BaseProcess):
+
+ def __init__(self, name, pid, sentinel):
+ self._identity = ()
+ self._name = name
+ self._pid = pid
+ self._parent_pid = None
+ self._popen = None
+ self._closed = False
+ self._sentinel = sentinel
+ self._config = {}
+
+ def is_alive(self):
+ from multiprocessing.connection import wait
+ return not wait([self._sentinel], timeout=0)
+
+ @property
+ def ident(self):
+ return self._pid
+
+ def join(self, timeout=None):
+ '''
+ Wait until parent process terminates
+ '''
+ from multiprocessing.connection import wait
+ wait([self._sentinel], timeout=timeout)
+
+ pid = ident
+
+#
# Create object representing the main process
#
@@ -412,7 +412,7 @@ class _MainProcess(BaseProcess):
pass
-_parent_process = None
+_parent_process = None
_current_process = _MainProcess()
_process_counter = itertools.count(1)
_children = set()
@@ -426,7 +426,7 @@ _exitcode_to_name = {}
for name, signum in list(signal.__dict__.items()):
if name[:3]=='SIG' and '_' not in name:
- _exitcode_to_name[-signum] = f'-{name}'
+ _exitcode_to_name[-signum] = f'-{name}'
# For debug and leak testing
_dangling = WeakSet()
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/queues.py b/contrib/tools/python3/src/Lib/multiprocessing/queues.py
index a290181487..1646d270ec 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/queues.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/queues.py
@@ -14,7 +14,7 @@ import os
import threading
import collections
import time
-import types
+import types
import weakref
import errno
@@ -49,7 +49,7 @@ class Queue(object):
self._sem = ctx.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
- self._reset()
+ self._reset()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
@@ -62,17 +62,17 @@ class Queue(object):
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
- self._reset()
+ self._reset()
def _after_fork(self):
debug('Queue._after_fork()')
- self._reset(after_fork=True)
-
- def _reset(self, after_fork=False):
- if after_fork:
- self._notempty._at_fork_reinit()
- else:
- self._notempty = threading.Condition(threading.Lock())
+ self._reset(after_fork=True)
+
+ def _reset(self, after_fork=False):
+ if after_fork:
+ self._notempty._at_fork_reinit()
+ else:
+ self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
@@ -84,8 +84,8 @@ class Queue(object):
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
- if self._closed:
- raise ValueError(f"Queue {self!r} is closed")
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
@@ -96,8 +96,8 @@ class Queue(object):
self._notempty.notify()
def get(self, block=True, timeout=None):
- if self._closed:
- raise ValueError(f"Queue {self!r} is closed")
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
@@ -307,8 +307,8 @@ class JoinableQueue(Queue):
self._cond, self._unfinished_tasks = state[-2:]
def put(self, obj, block=True, timeout=None):
- if self._closed:
- raise ValueError(f"Queue {self!r} is closed")
+ if self._closed:
+ raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
@@ -346,10 +346,10 @@ class SimpleQueue(object):
else:
self._wlock = ctx.Lock()
- def close(self):
- self._reader.close()
- self._writer.close()
-
+ def close(self):
+ self._reader.close()
+ self._writer.close()
+
def empty(self):
return not self._poll()
@@ -376,5 +376,5 @@ class SimpleQueue(object):
else:
with self._wlock:
self._writer.send_bytes(obj)
-
- __class_getitem__ = classmethod(types.GenericAlias)
+
+ __class_getitem__ = classmethod(types.GenericAlias)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/reduction.py b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
index 5593f0682f..d4a7f802ba 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py
@@ -68,16 +68,16 @@ if sys.platform == 'win32':
__all__ += ['DupHandle', 'duplicate', 'steal_handle']
import _winapi
- def duplicate(handle, target_process=None, inheritable=False,
- *, source_process=None):
+ def duplicate(handle, target_process=None, inheritable=False,
+ *, source_process=None):
'''Duplicate a handle. (target_process is a handle not a pid!)'''
- current_process = _winapi.GetCurrentProcess()
- if source_process is None:
- source_process = current_process
+ current_process = _winapi.GetCurrentProcess()
+ if source_process is None:
+ source_process = current_process
if target_process is None:
- target_process = current_process
+ target_process = current_process
return _winapi.DuplicateHandle(
- source_process, handle, target_process,
+ source_process, handle, target_process,
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
def steal_handle(source_pid, handle):
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py b/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py
index 66076509a1..aef0c1e84f 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py
@@ -59,7 +59,7 @@ else:
class _ResourceSharer(object):
- '''Manager for resources using background thread.'''
+ '''Manager for resources using background thread.'''
def __init__(self):
self._key = 0
self._cache = {}
@@ -112,7 +112,7 @@ class _ResourceSharer(object):
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
- self._lock._at_fork_reinit()
+ self._lock._at_fork_reinit()
if self._listener is not None:
self._listener.close()
self._listener = None
@@ -132,7 +132,7 @@ class _ResourceSharer(object):
def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
- signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
+ signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
while 1:
try:
with self._listener.accept() as conn:
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py
index c9bfa9b82b..dc24abcae1 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py
@@ -1,231 +1,231 @@
-###############################################################################
-# Server process to keep track of unlinked resources (like shared memory
-# segments, semaphores etc.) and clean them.
-#
-# On Unix we run a server process which keeps track of unlinked
-# resources. The server ignores SIGINT and SIGTERM and reads from a
-# pipe. Every other process of the program has a copy of the writable
-# end of the pipe, so we get EOF when all other processes have exited.
-# Then the server process unlinks any remaining resource names.
-#
-# This is important because there may be system limits for such resources: for
-# instance, the system only supports a limited number of named semaphores, and
-# shared-memory segments live in the RAM. If a python process leaks such a
-# resource, this resource will not be removed till the next reboot. Without
-# this resource tracker process, "killall python" would probably leave unlinked
-# resources.
-
-import os
-import signal
-import sys
-import threading
-import warnings
-
-from . import spawn
-from . import util
-
-__all__ = ['ensure_running', 'register', 'unregister']
-
-_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
-_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
-
-_CLEANUP_FUNCS = {
- 'noop': lambda: None,
-}
-
-if os.name == 'posix':
- import _multiprocessing
- import _posixshmem
-
- _CLEANUP_FUNCS.update({
- 'semaphore': _multiprocessing.sem_unlink,
- 'shared_memory': _posixshmem.shm_unlink,
- })
-
-
-class ResourceTracker(object):
-
- def __init__(self):
- self._lock = threading.Lock()
- self._fd = None
- self._pid = None
-
- def _stop(self):
- with self._lock:
- if self._fd is None:
- # not running
- return
-
- # closing the "alive" file descriptor stops main()
- os.close(self._fd)
- self._fd = None
-
- os.waitpid(self._pid, 0)
- self._pid = None
-
- def getfd(self):
- self.ensure_running()
- return self._fd
-
- def ensure_running(self):
- '''Make sure that resource tracker process is running.
-
- This can be run from any process. Usually a child process will use
- the resource created by its parent.'''
- with self._lock:
- if self._fd is not None:
- # resource tracker was launched before, is it still running?
- if self._check_alive():
- # => still alive
- return
- # => dead, launch it again
- os.close(self._fd)
-
- # Clean-up to avoid dangling processes.
- try:
- # _pid can be None if this process is a child from another
- # python process, which has started the resource_tracker.
- if self._pid is not None:
- os.waitpid(self._pid, 0)
- except ChildProcessError:
- # The resource_tracker has already been terminated.
- pass
- self._fd = None
- self._pid = None
-
- warnings.warn('resource_tracker: process died unexpectedly, '
- 'relaunching. Some resources might leak.')
-
- fds_to_pass = []
- try:
- fds_to_pass.append(sys.stderr.fileno())
- except Exception:
- pass
- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
- r, w = os.pipe()
- try:
- fds_to_pass.append(r)
- # process will out live us, so no need to wait on pid
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd % r]
- # bpo-33613: Register a signal mask that will block the signals.
- # This signal mask will be inherited by the child that is going
- # to be spawned and will protect the child from a race condition
- # that can make the child die before it registers signal handlers
- # for SIGINT and SIGTERM. The mask is unregistered after spawning
- # the child.
- try:
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
- finally:
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
- except:
- os.close(w)
- raise
- else:
- self._fd = w
- self._pid = pid
- finally:
- os.close(r)
-
- def _check_alive(self):
- '''Check that the pipe has not been closed by sending a probe.'''
- try:
- # We cannot use send here as it calls ensure_running, creating
- # a cycle.
- os.write(self._fd, b'PROBE:0:noop\n')
- except OSError:
- return False
- else:
- return True
-
- def register(self, name, rtype):
- '''Register name of resource with resource tracker.'''
- self._send('REGISTER', name, rtype)
-
- def unregister(self, name, rtype):
- '''Unregister name of resource with resource tracker.'''
- self._send('UNREGISTER', name, rtype)
-
- def _send(self, cmd, name, rtype):
- self.ensure_running()
- msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
- if len(name) > 512:
- # posix guarantees that writes to a pipe of less than PIPE_BUF
- # bytes are atomic, and that PIPE_BUF >= 512
- raise ValueError('name too long')
- nbytes = os.write(self._fd, msg)
- assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
- nbytes, len(msg))
-
-
-_resource_tracker = ResourceTracker()
-ensure_running = _resource_tracker.ensure_running
-register = _resource_tracker.register
-unregister = _resource_tracker.unregister
-getfd = _resource_tracker.getfd
-
-def main(fd):
- '''Run resource tracker.'''
- # protect the process from ^C and "killall python" etc
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- if _HAVE_SIGMASK:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
-
- for f in (sys.stdin, sys.stdout):
- try:
- f.close()
- except Exception:
- pass
-
- cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
- try:
- # keep track of registered/unregistered resources
- with open(fd, 'rb') as f:
- for line in f:
- try:
- cmd, name, rtype = line.strip().decode('ascii').split(':')
- cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
- if cleanup_func is None:
- raise ValueError(
- f'Cannot register {name} for automatic cleanup: '
- f'unknown resource type {rtype}')
-
- if cmd == 'REGISTER':
- cache[rtype].add(name)
- elif cmd == 'UNREGISTER':
- cache[rtype].remove(name)
- elif cmd == 'PROBE':
- pass
- else:
- raise RuntimeError('unrecognized command %r' % cmd)
- except Exception:
- try:
- sys.excepthook(*sys.exc_info())
- except:
- pass
- finally:
- # all processes have terminated; cleanup any remaining resources
- for rtype, rtype_cache in cache.items():
- if rtype_cache:
- try:
- warnings.warn('resource_tracker: There appear to be %d '
- 'leaked %s objects to clean up at shutdown' %
- (len(rtype_cache), rtype))
- except Exception:
- pass
- for name in rtype_cache:
- # For some reason the process which created and registered this
- # resource has failed to unregister it. Presumably it has
- # died. We therefore unlink it.
- try:
- try:
- _CLEANUP_FUNCS[rtype](name)
- except Exception as e:
- warnings.warn('resource_tracker: %r: %s' % (name, e))
- finally:
- pass
+###############################################################################
+# Server process to keep track of unlinked resources (like shared memory
+# segments, semaphores etc.) and clean them.
+#
+# On Unix we run a server process which keeps track of unlinked
+# resources. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining resource names.
+#
+# This is important because there may be system limits for such resources: for
+# instance, the system only supports a limited number of named semaphores, and
+# shared-memory segments live in the RAM. If a python process leaks such a
+# resource, this resource will not be removed till the next reboot. Without
+# this resource tracker process, "killall python" would probably leave unlinked
+# resources.
+
+import os
+import signal
+import sys
+import threading
+import warnings
+
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
+_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+
+_CLEANUP_FUNCS = {
+ 'noop': lambda: None,
+}
+
+if os.name == 'posix':
+ import _multiprocessing
+ import _posixshmem
+
+ _CLEANUP_FUNCS.update({
+ 'semaphore': _multiprocessing.sem_unlink,
+ 'shared_memory': _posixshmem.shm_unlink,
+ })
+
+
+class ResourceTracker(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._fd = None
+ self._pid = None
+
+ def _stop(self):
+ with self._lock:
+ if self._fd is None:
+ # not running
+ return
+
+ # closing the "alive" file descriptor stops main()
+ os.close(self._fd)
+ self._fd = None
+
+ os.waitpid(self._pid, 0)
+ self._pid = None
+
+ def getfd(self):
+ self.ensure_running()
+ return self._fd
+
+ def ensure_running(self):
+ '''Make sure that resource tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the resource created by its parent.'''
+ with self._lock:
+ if self._fd is not None:
+ # resource tracker was launched before, is it still running?
+ if self._check_alive():
+ # => still alive
+ return
+ # => dead, launch it again
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
+ try:
+ # _pid can be None if this process is a child from another
+ # python process, which has started the resource_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
+ except ChildProcessError:
+ # The resource_tracker has already been terminated.
+ pass
+ self._fd = None
+ self._pid = None
+
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
+
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ try:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ self._pid = pid
+ finally:
+ os.close(r)
+
+ def _check_alive(self):
+ '''Check that the pipe has not been closed by sending a probe.'''
+ try:
+ # We cannot use send here as it calls ensure_running, creating
+ # a cycle.
+ os.write(self._fd, b'PROBE:0:noop\n')
+ except OSError:
+ return False
+ else:
+ return True
+
+ def register(self, name, rtype):
+ '''Register name of resource with resource tracker.'''
+ self._send('REGISTER', name, rtype)
+
+ def unregister(self, name, rtype):
+ '''Unregister name of resource with resource tracker.'''
+ self._send('UNREGISTER', name, rtype)
+
+ def _send(self, cmd, name, rtype):
+ self.ensure_running()
+ msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
+ nbytes, len(msg))
+
+
+_resource_tracker = ResourceTracker()
+ensure_running = _resource_tracker.ensure_running
+register = _resource_tracker.register
+unregister = _resource_tracker.unregister
+getfd = _resource_tracker.getfd
+
+def main(fd):
+ '''Run resource tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
+ try:
+ # keep track of registered/unregistered resources
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name, rtype = line.strip().decode('ascii').split(':')
+ cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
+ if cleanup_func is None:
+ raise ValueError(
+ f'Cannot register {name} for automatic cleanup: '
+ f'unknown resource type {rtype}')
+
+ if cmd == 'REGISTER':
+ cache[rtype].add(name)
+ elif cmd == 'UNREGISTER':
+ cache[rtype].remove(name)
+ elif cmd == 'PROBE':
+ pass
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining resources
+ for rtype, rtype_cache in cache.items():
+ if rtype_cache:
+ try:
+ warnings.warn('resource_tracker: There appear to be %d '
+ 'leaked %s objects to clean up at shutdown' %
+ (len(rtype_cache), rtype))
+ except Exception:
+ pass
+ for name in rtype_cache:
+ # For some reason the process which created and registered this
+ # resource has failed to unregister it. Presumably it has
+ # died. We therefore unlink it.
+ try:
+ try:
+ _CLEANUP_FUNCS[rtype](name)
+ except Exception as e:
+ warnings.warn('resource_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
index 122b3fcebf..db0516a993 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
@@ -1,532 +1,532 @@
-"""Provides shared memory for direct access across processes.
-
-The API of this package is currently provisional. Refer to the
-documentation for details.
-"""
-
-
-__all__ = [ 'SharedMemory', 'ShareableList' ]
-
-
-from functools import partial
-import mmap
-import os
-import errno
-import struct
-import secrets
-import types
-
-if os.name == "nt":
- import _winapi
- _USE_POSIX = False
-else:
- import _posixshmem
- _USE_POSIX = True
-
-
-_O_CREX = os.O_CREAT | os.O_EXCL
-
-# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
-_SHM_SAFE_NAME_LENGTH = 14
-
-# Shared memory block name prefix
-if _USE_POSIX:
- _SHM_NAME_PREFIX = '/psm_'
-else:
- _SHM_NAME_PREFIX = 'wnsm_'
-
-
-def _make_filename():
- "Create a random filename for the shared memory object."
- # number of random bytes to use for name
- nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
- assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
- name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
- assert len(name) <= _SHM_SAFE_NAME_LENGTH
- return name
-
-
-class SharedMemory:
- """Creates a new shared memory block or attaches to an existing
- shared memory block.
-
- Every shared memory block is assigned a unique name. This enables
- one process to create a shared memory block with a particular name
- so that a different process can attach to that same shared memory
- block using that same name.
-
- As a resource for sharing data across processes, shared memory blocks
- may outlive the original process that created them. When one process
- no longer needs access to a shared memory block that might still be
- needed by other processes, the close() method should be called.
- When a shared memory block is no longer needed by any process, the
- unlink() method should be called to ensure proper cleanup."""
-
- # Defaults; enables close() and unlink() to run without errors.
- _name = None
- _fd = -1
- _mmap = None
- _buf = None
- _flags = os.O_RDWR
- _mode = 0o600
- _prepend_leading_slash = True if _USE_POSIX else False
-
- def __init__(self, name=None, create=False, size=0):
- if not size >= 0:
- raise ValueError("'size' must be a positive integer")
- if create:
- self._flags = _O_CREX | os.O_RDWR
- if size == 0:
- raise ValueError("'size' must be a positive number different from zero")
- if name is None and not self._flags & os.O_EXCL:
- raise ValueError("'name' can only be None if create=True")
-
- if _USE_POSIX:
-
- # POSIX Shared Memory
-
- if name is None:
- while True:
- name = _make_filename()
- try:
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- except FileExistsError:
- continue
- self._name = name
- break
- else:
- name = "/" + name if self._prepend_leading_slash else name
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- self._name = name
- try:
- if create and size:
- os.ftruncate(self._fd, size)
- stats = os.fstat(self._fd)
- size = stats.st_size
- self._mmap = mmap.mmap(self._fd, size)
- except OSError:
- self.unlink()
- raise
-
- from .resource_tracker import register
- register(self._name, "shared_memory")
-
- else:
-
- # Windows Named Shared Memory
-
- if create:
- while True:
- temp_name = _make_filename() if name is None else name
- # Create and reserve shared memory block with this name
- # until it can be attached to by mmap.
- h_map = _winapi.CreateFileMapping(
- _winapi.INVALID_HANDLE_VALUE,
- _winapi.NULL,
- _winapi.PAGE_READWRITE,
- (size >> 32) & 0xFFFFFFFF,
- size & 0xFFFFFFFF,
- temp_name
- )
- try:
- last_error_code = _winapi.GetLastError()
- if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
- if name is not None:
- raise FileExistsError(
- errno.EEXIST,
- os.strerror(errno.EEXIST),
- name,
- _winapi.ERROR_ALREADY_EXISTS
- )
- else:
- continue
- self._mmap = mmap.mmap(-1, size, tagname=temp_name)
- finally:
- _winapi.CloseHandle(h_map)
- self._name = temp_name
- break
-
- else:
- self._name = name
- # Dynamically determine the existing named shared memory
- # block's size which is likely a multiple of mmap.PAGESIZE.
- h_map = _winapi.OpenFileMapping(
- _winapi.FILE_MAP_READ,
- False,
- name
- )
- try:
- p_buf = _winapi.MapViewOfFile(
- h_map,
- _winapi.FILE_MAP_READ,
- 0,
- 0,
- 0
- )
- finally:
- _winapi.CloseHandle(h_map)
- size = _winapi.VirtualQuerySize(p_buf)
- self._mmap = mmap.mmap(-1, size, tagname=name)
-
- self._size = size
- self._buf = memoryview(self._mmap)
-
- def __del__(self):
- try:
- self.close()
- except OSError:
- pass
-
- def __reduce__(self):
- return (
- self.__class__,
- (
- self.name,
- False,
- self.size,
- ),
- )
-
- def __repr__(self):
- return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
-
- @property
- def buf(self):
- "A memoryview of contents of the shared memory block."
- return self._buf
-
- @property
- def name(self):
- "Unique name that identifies the shared memory block."
- reported_name = self._name
- if _USE_POSIX and self._prepend_leading_slash:
- if self._name.startswith("/"):
- reported_name = self._name[1:]
- return reported_name
-
- @property
- def size(self):
- "Size in bytes."
- return self._size
-
- def close(self):
- """Closes access to the shared memory from this instance but does
- not destroy the shared memory block."""
- if self._buf is not None:
- self._buf.release()
- self._buf = None
- if self._mmap is not None:
- self._mmap.close()
- self._mmap = None
- if _USE_POSIX and self._fd >= 0:
- os.close(self._fd)
- self._fd = -1
-
- def unlink(self):
- """Requests that the underlying shared memory block be destroyed.
-
- In order to ensure proper cleanup of resources, unlink should be
- called once (and only once) across all processes which have access
- to the shared memory block."""
- if _USE_POSIX and self._name:
- from .resource_tracker import unregister
- _posixshmem.shm_unlink(self._name)
- unregister(self._name, "shared_memory")
-
-
-_encoding = "utf8"
-
-class ShareableList:
- """Pattern for a mutable list-like object shareable via a shared
- memory block. It differs from the built-in list type in that these
- lists can not change their overall length (i.e. no append, insert,
- etc.)
-
- Because values are packed into a memoryview as bytes, the struct
- packing format for any storable value must require no more than 8
- characters to describe its format."""
-
- # The shared memory area is organized as follows:
- # - 8 bytes: number of items (N) as a 64-bit integer
- # - (N + 1) * 8 bytes: offsets of each element from the start of the
- # data area
- # - K bytes: the data area storing item values (with encoding and size
- # depending on their respective types)
- # - N * 8 bytes: `struct` format string for each element
- # - N bytes: index into _back_transforms_mapping for each element
- # (for reconstructing the corresponding Python value)
- _types_mapping = {
- int: "q",
- float: "d",
- bool: "xxxxxxx?",
- str: "%ds",
- bytes: "%ds",
- None.__class__: "xxxxxx?x",
- }
- _alignment = 8
- _back_transforms_mapping = {
- 0: lambda value: value, # int, float, bool
- 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
- 2: lambda value: value.rstrip(b'\x00'), # bytes
- 3: lambda _value: None, # None
- }
-
- @staticmethod
- def _extract_recreation_code(value):
- """Used in concert with _back_transforms_mapping to convert values
- into the appropriate Python objects when retrieving them from
- the list as well as when storing them."""
- if not isinstance(value, (str, bytes, None.__class__)):
- return 0
- elif isinstance(value, str):
- return 1
- elif isinstance(value, bytes):
- return 2
- else:
- return 3 # NoneType
-
- def __init__(self, sequence=None, *, name=None):
- if name is None or sequence is not None:
- sequence = sequence or ()
- _formats = [
- self._types_mapping[type(item)]
- if not isinstance(item, (str, bytes))
- else self._types_mapping[type(item)] % (
- self._alignment * (len(item) // self._alignment + 1),
- )
- for item in sequence
- ]
- self._list_len = len(_formats)
- assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
- offset = 0
- # The offsets of each list element into the shared memory's
- # data area (0 meaning the start of the data area, not the start
- # of the shared memory area).
- self._allocated_offsets = [0]
- for fmt in _formats:
- offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
- self._allocated_offsets.append(offset)
- _recreation_codes = [
- self._extract_recreation_code(item) for item in sequence
- ]
- requested_size = struct.calcsize(
- "q" + self._format_size_metainfo +
- "".join(_formats) +
- self._format_packing_metainfo +
- self._format_back_transform_codes
- )
-
- self.shm = SharedMemory(name, create=True, size=requested_size)
- else:
- self.shm = SharedMemory(name)
-
- if sequence is not None:
- _enc = _encoding
- struct.pack_into(
- "q" + self._format_size_metainfo,
- self.shm.buf,
- 0,
- self._list_len,
- *(self._allocated_offsets)
- )
- struct.pack_into(
- "".join(_formats),
- self.shm.buf,
- self._offset_data_start,
- *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
- )
- struct.pack_into(
- self._format_packing_metainfo,
- self.shm.buf,
- self._offset_packing_formats,
- *(v.encode(_enc) for v in _formats)
- )
- struct.pack_into(
- self._format_back_transform_codes,
- self.shm.buf,
- self._offset_back_transform_codes,
- *(_recreation_codes)
- )
-
- else:
- self._list_len = len(self) # Obtains size from offset 0 in buffer.
- self._allocated_offsets = list(
- struct.unpack_from(
- self._format_size_metainfo,
- self.shm.buf,
- 1 * 8
- )
- )
-
- def _get_packing_format(self, position):
- "Gets the packing format for a single value stored in the list."
- position = position if position >= 0 else position + self._list_len
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- v = struct.unpack_from(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8
- )[0]
- fmt = v.rstrip(b'\x00')
- fmt_as_str = fmt.decode(_encoding)
-
- return fmt_as_str
-
- def _get_back_transform(self, position):
- "Gets the back transformation function for a single value."
-
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- transform_code = struct.unpack_from(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position
- )[0]
- transform_function = self._back_transforms_mapping[transform_code]
-
- return transform_function
-
- def _set_packing_format_and_transform(self, position, fmt_as_str, value):
- """Sets the packing format and back transformation code for a
- single value in the list at the specified position."""
-
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- struct.pack_into(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8,
- fmt_as_str.encode(_encoding)
- )
-
- transform_code = self._extract_recreation_code(value)
- struct.pack_into(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position,
- transform_code
- )
-
- def __getitem__(self, position):
- position = position if position >= 0 else position + self._list_len
- try:
- offset = self._offset_data_start + self._allocated_offsets[position]
- (v,) = struct.unpack_from(
- self._get_packing_format(position),
- self.shm.buf,
- offset
- )
- except IndexError:
- raise IndexError("index out of range")
-
- back_transform = self._get_back_transform(position)
- v = back_transform(v)
-
- return v
-
- def __setitem__(self, position, value):
- position = position if position >= 0 else position + self._list_len
- try:
- item_offset = self._allocated_offsets[position]
- offset = self._offset_data_start + item_offset
- current_format = self._get_packing_format(position)
- except IndexError:
- raise IndexError("assignment index out of range")
-
- if not isinstance(value, (str, bytes)):
- new_format = self._types_mapping[type(value)]
- encoded_value = value
- else:
- allocated_length = self._allocated_offsets[position + 1] - item_offset
-
- encoded_value = (value.encode(_encoding)
- if isinstance(value, str) else value)
- if len(encoded_value) > allocated_length:
- raise ValueError("bytes/str item exceeds available storage")
- if current_format[-1] == "s":
- new_format = current_format
- else:
- new_format = self._types_mapping[str] % (
- allocated_length,
- )
-
- self._set_packing_format_and_transform(
- position,
- new_format,
- value
- )
- struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
-
- def __reduce__(self):
- return partial(self.__class__, name=self.shm.name), ()
-
- def __len__(self):
- return struct.unpack_from("q", self.shm.buf, 0)[0]
-
- def __repr__(self):
- return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
-
- @property
- def format(self):
- "The struct packing format used by all currently stored items."
- return "".join(
- self._get_packing_format(i) for i in range(self._list_len)
- )
-
- @property
- def _format_size_metainfo(self):
- "The struct packing format used for the items' storage offsets."
- return "q" * (self._list_len + 1)
-
- @property
- def _format_packing_metainfo(self):
- "The struct packing format used for the items' packing formats."
- return "8s" * self._list_len
-
- @property
- def _format_back_transform_codes(self):
- "The struct packing format used for the items' back transforms."
- return "b" * self._list_len
-
- @property
- def _offset_data_start(self):
- # - 8 bytes for the list length
- # - (N + 1) * 8 bytes for the element offsets
- return (self._list_len + 2) * 8
-
- @property
- def _offset_packing_formats(self):
- return self._offset_data_start + self._allocated_offsets[-1]
-
- @property
- def _offset_back_transform_codes(self):
- return self._offset_packing_formats + self._list_len * 8
-
- def count(self, value):
- "L.count(value) -> integer -- return number of occurrences of value."
-
- return sum(value == entry for entry in self)
-
- def index(self, value):
- """L.index(value) -> integer -- return first index of value.
- Raises ValueError if the value is not present."""
-
- for position, entry in enumerate(self):
- if value == entry:
- return position
- else:
- raise ValueError(f"{value!r} not in this container")
-
- __class_getitem__ = classmethod(types.GenericAlias)
+"""Provides shared memory for direct access across processes.
+
+The API of this package is currently provisional. Refer to the
+documentation for details.
+"""
+
+
+__all__ = [ 'SharedMemory', 'ShareableList' ]
+
+
+from functools import partial
+import mmap
+import os
+import errno
+import struct
+import secrets
+import types
+
+if os.name == "nt":
+ import _winapi
+ _USE_POSIX = False
+else:
+ import _posixshmem
+ _USE_POSIX = True
+
+
+_O_CREX = os.O_CREAT | os.O_EXCL
+
+# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
+_SHM_SAFE_NAME_LENGTH = 14
+
+# Shared memory block name prefix
+if _USE_POSIX:
+ _SHM_NAME_PREFIX = '/psm_'
+else:
+ _SHM_NAME_PREFIX = 'wnsm_'
+
+
+def _make_filename():
+ "Create a random filename for the shared memory object."
+ # number of random bytes to use for name
+ nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
+ assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
+ name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
+ assert len(name) <= _SHM_SAFE_NAME_LENGTH
+ return name
+
+
+class SharedMemory:
+ """Creates a new shared memory block or attaches to an existing
+ shared memory block.
+
+ Every shared memory block is assigned a unique name. This enables
+ one process to create a shared memory block with a particular name
+ so that a different process can attach to that same shared memory
+ block using that same name.
+
+ As a resource for sharing data across processes, shared memory blocks
+ may outlive the original process that created them. When one process
+ no longer needs access to a shared memory block that might still be
+ needed by other processes, the close() method should be called.
+ When a shared memory block is no longer needed by any process, the
+ unlink() method should be called to ensure proper cleanup."""
+
+ # Defaults; enables close() and unlink() to run without errors.
+ _name = None
+ _fd = -1
+ _mmap = None
+ _buf = None
+ _flags = os.O_RDWR
+ _mode = 0o600
+ _prepend_leading_slash = True if _USE_POSIX else False
+
+ def __init__(self, name=None, create=False, size=0):
+ if not size >= 0:
+ raise ValueError("'size' must be a positive integer")
+ if create:
+ self._flags = _O_CREX | os.O_RDWR
+ if size == 0:
+ raise ValueError("'size' must be a positive number different from zero")
+ if name is None and not self._flags & os.O_EXCL:
+ raise ValueError("'name' can only be None if create=True")
+
+ if _USE_POSIX:
+
+ # POSIX Shared Memory
+
+ if name is None:
+ while True:
+ name = _make_filename()
+ try:
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ except FileExistsError:
+ continue
+ self._name = name
+ break
+ else:
+ name = "/" + name if self._prepend_leading_slash else name
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ self._name = name
+ try:
+ if create and size:
+ os.ftruncate(self._fd, size)
+ stats = os.fstat(self._fd)
+ size = stats.st_size
+ self._mmap = mmap.mmap(self._fd, size)
+ except OSError:
+ self.unlink()
+ raise
+
+ from .resource_tracker import register
+ register(self._name, "shared_memory")
+
+ else:
+
+ # Windows Named Shared Memory
+
+ if create:
+ while True:
+ temp_name = _make_filename() if name is None else name
+ # Create and reserve shared memory block with this name
+ # until it can be attached to by mmap.
+ h_map = _winapi.CreateFileMapping(
+ _winapi.INVALID_HANDLE_VALUE,
+ _winapi.NULL,
+ _winapi.PAGE_READWRITE,
+ (size >> 32) & 0xFFFFFFFF,
+ size & 0xFFFFFFFF,
+ temp_name
+ )
+ try:
+ last_error_code = _winapi.GetLastError()
+ if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
+ if name is not None:
+ raise FileExistsError(
+ errno.EEXIST,
+ os.strerror(errno.EEXIST),
+ name,
+ _winapi.ERROR_ALREADY_EXISTS
+ )
+ else:
+ continue
+ self._mmap = mmap.mmap(-1, size, tagname=temp_name)
+ finally:
+ _winapi.CloseHandle(h_map)
+ self._name = temp_name
+ break
+
+ else:
+ self._name = name
+ # Dynamically determine the existing named shared memory
+ # block's size which is likely a multiple of mmap.PAGESIZE.
+ h_map = _winapi.OpenFileMapping(
+ _winapi.FILE_MAP_READ,
+ False,
+ name
+ )
+ try:
+ p_buf = _winapi.MapViewOfFile(
+ h_map,
+ _winapi.FILE_MAP_READ,
+ 0,
+ 0,
+ 0
+ )
+ finally:
+ _winapi.CloseHandle(h_map)
+ size = _winapi.VirtualQuerySize(p_buf)
+ self._mmap = mmap.mmap(-1, size, tagname=name)
+
+ self._size = size
+ self._buf = memoryview(self._mmap)
+
+ def __del__(self):
+ try:
+ self.close()
+ except OSError:
+ pass
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.name,
+ False,
+ self.size,
+ ),
+ )
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
+
+ @property
+ def buf(self):
+ "A memoryview of contents of the shared memory block."
+ return self._buf
+
+ @property
+ def name(self):
+ "Unique name that identifies the shared memory block."
+ reported_name = self._name
+ if _USE_POSIX and self._prepend_leading_slash:
+ if self._name.startswith("/"):
+ reported_name = self._name[1:]
+ return reported_name
+
+ @property
+ def size(self):
+ "Size in bytes."
+ return self._size
+
+ def close(self):
+ """Closes access to the shared memory from this instance but does
+ not destroy the shared memory block."""
+ if self._buf is not None:
+ self._buf.release()
+ self._buf = None
+ if self._mmap is not None:
+ self._mmap.close()
+ self._mmap = None
+ if _USE_POSIX and self._fd >= 0:
+ os.close(self._fd)
+ self._fd = -1
+
+ def unlink(self):
+ """Requests that the underlying shared memory block be destroyed.
+
+ In order to ensure proper cleanup of resources, unlink should be
+ called once (and only once) across all processes which have access
+ to the shared memory block."""
+ if _USE_POSIX and self._name:
+ from .resource_tracker import unregister
+ _posixshmem.shm_unlink(self._name)
+ unregister(self._name, "shared_memory")
+
+
+_encoding = "utf8"
+
+class ShareableList:
+ """Pattern for a mutable list-like object shareable via a shared
+ memory block. It differs from the built-in list type in that these
+ lists can not change their overall length (i.e. no append, insert,
+ etc.)
+
+ Because values are packed into a memoryview as bytes, the struct
+ packing format for any storable value must require no more than 8
+ characters to describe its format."""
+
+ # The shared memory area is organized as follows:
+ # - 8 bytes: number of items (N) as a 64-bit integer
+ # - (N + 1) * 8 bytes: offsets of each element from the start of the
+ # data area
+ # - K bytes: the data area storing item values (with encoding and size
+ # depending on their respective types)
+ # - N * 8 bytes: `struct` format string for each element
+ # - N bytes: index into _back_transforms_mapping for each element
+ # (for reconstructing the corresponding Python value)
+ _types_mapping = {
+ int: "q",
+ float: "d",
+ bool: "xxxxxxx?",
+ str: "%ds",
+ bytes: "%ds",
+ None.__class__: "xxxxxx?x",
+ }
+ _alignment = 8
+ _back_transforms_mapping = {
+ 0: lambda value: value, # int, float, bool
+ 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
+ 2: lambda value: value.rstrip(b'\x00'), # bytes
+ 3: lambda _value: None, # None
+ }
+
+ @staticmethod
+ def _extract_recreation_code(value):
+ """Used in concert with _back_transforms_mapping to convert values
+ into the appropriate Python objects when retrieving them from
+ the list as well as when storing them."""
+ if not isinstance(value, (str, bytes, None.__class__)):
+ return 0
+ elif isinstance(value, str):
+ return 1
+ elif isinstance(value, bytes):
+ return 2
+ else:
+ return 3 # NoneType
+
+ def __init__(self, sequence=None, *, name=None):
+ if name is None or sequence is not None:
+ sequence = sequence or ()
+ _formats = [
+ self._types_mapping[type(item)]
+ if not isinstance(item, (str, bytes))
+ else self._types_mapping[type(item)] % (
+ self._alignment * (len(item) // self._alignment + 1),
+ )
+ for item in sequence
+ ]
+ self._list_len = len(_formats)
+ assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
+ offset = 0
+ # The offsets of each list element into the shared memory's
+ # data area (0 meaning the start of the data area, not the start
+ # of the shared memory area).
+ self._allocated_offsets = [0]
+ for fmt in _formats:
+ offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
+ self._allocated_offsets.append(offset)
+ _recreation_codes = [
+ self._extract_recreation_code(item) for item in sequence
+ ]
+ requested_size = struct.calcsize(
+ "q" + self._format_size_metainfo +
+ "".join(_formats) +
+ self._format_packing_metainfo +
+ self._format_back_transform_codes
+ )
+
+ self.shm = SharedMemory(name, create=True, size=requested_size)
+ else:
+ self.shm = SharedMemory(name)
+
+ if sequence is not None:
+ _enc = _encoding
+ struct.pack_into(
+ "q" + self._format_size_metainfo,
+ self.shm.buf,
+ 0,
+ self._list_len,
+ *(self._allocated_offsets)
+ )
+ struct.pack_into(
+ "".join(_formats),
+ self.shm.buf,
+ self._offset_data_start,
+ *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
+ )
+ struct.pack_into(
+ self._format_packing_metainfo,
+ self.shm.buf,
+ self._offset_packing_formats,
+ *(v.encode(_enc) for v in _formats)
+ )
+ struct.pack_into(
+ self._format_back_transform_codes,
+ self.shm.buf,
+ self._offset_back_transform_codes,
+ *(_recreation_codes)
+ )
+
+ else:
+ self._list_len = len(self) # Obtains size from offset 0 in buffer.
+ self._allocated_offsets = list(
+ struct.unpack_from(
+ self._format_size_metainfo,
+ self.shm.buf,
+ 1 * 8
+ )
+ )
+
+ def _get_packing_format(self, position):
+ "Gets the packing format for a single value stored in the list."
+ position = position if position >= 0 else position + self._list_len
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ v = struct.unpack_from(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8
+ )[0]
+ fmt = v.rstrip(b'\x00')
+ fmt_as_str = fmt.decode(_encoding)
+
+ return fmt_as_str
+
+ def _get_back_transform(self, position):
+ "Gets the back transformation function for a single value."
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ transform_code = struct.unpack_from(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position
+ )[0]
+ transform_function = self._back_transforms_mapping[transform_code]
+
+ return transform_function
+
+ def _set_packing_format_and_transform(self, position, fmt_as_str, value):
+ """Sets the packing format and back transformation code for a
+ single value in the list at the specified position."""
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ struct.pack_into(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8,
+ fmt_as_str.encode(_encoding)
+ )
+
+ transform_code = self._extract_recreation_code(value)
+ struct.pack_into(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position,
+ transform_code
+ )
+
+ def __getitem__(self, position):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ offset = self._offset_data_start + self._allocated_offsets[position]
+ (v,) = struct.unpack_from(
+ self._get_packing_format(position),
+ self.shm.buf,
+ offset
+ )
+ except IndexError:
+ raise IndexError("index out of range")
+
+ back_transform = self._get_back_transform(position)
+ v = back_transform(v)
+
+ return v
+
+ def __setitem__(self, position, value):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ item_offset = self._allocated_offsets[position]
+ offset = self._offset_data_start + item_offset
+ current_format = self._get_packing_format(position)
+ except IndexError:
+ raise IndexError("assignment index out of range")
+
+ if not isinstance(value, (str, bytes)):
+ new_format = self._types_mapping[type(value)]
+ encoded_value = value
+ else:
+ allocated_length = self._allocated_offsets[position + 1] - item_offset
+
+ encoded_value = (value.encode(_encoding)
+ if isinstance(value, str) else value)
+ if len(encoded_value) > allocated_length:
+ raise ValueError("bytes/str item exceeds available storage")
+ if current_format[-1] == "s":
+ new_format = current_format
+ else:
+ new_format = self._types_mapping[str] % (
+ allocated_length,
+ )
+
+ self._set_packing_format_and_transform(
+ position,
+ new_format,
+ value
+ )
+ struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
+
+ def __reduce__(self):
+ return partial(self.__class__, name=self.shm.name), ()
+
+ def __len__(self):
+ return struct.unpack_from("q", self.shm.buf, 0)[0]
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
+
+ @property
+ def format(self):
+ "The struct packing format used by all currently stored items."
+ return "".join(
+ self._get_packing_format(i) for i in range(self._list_len)
+ )
+
+ @property
+ def _format_size_metainfo(self):
+ "The struct packing format used for the items' storage offsets."
+ return "q" * (self._list_len + 1)
+
+ @property
+ def _format_packing_metainfo(self):
+ "The struct packing format used for the items' packing formats."
+ return "8s" * self._list_len
+
+ @property
+ def _format_back_transform_codes(self):
+ "The struct packing format used for the items' back transforms."
+ return "b" * self._list_len
+
+ @property
+ def _offset_data_start(self):
+ # - 8 bytes for the list length
+ # - (N + 1) * 8 bytes for the element offsets
+ return (self._list_len + 2) * 8
+
+ @property
+ def _offset_packing_formats(self):
+ return self._offset_data_start + self._allocated_offsets[-1]
+
+ @property
+ def _offset_back_transform_codes(self):
+ return self._offset_packing_formats + self._list_len * 8
+
+ def count(self, value):
+ "L.count(value) -> integer -- return number of occurrences of value."
+
+ return sum(value == entry for entry in self)
+
+ def index(self, value):
+ """L.index(value) -> integer -- return first index of value.
+ Raises ValueError if the value is not present."""
+
+ for position, entry in enumerate(self):
+ if value == entry:
+ return position
+ else:
+ raise ValueError(f"{value!r} not in this container")
+
+ __class_getitem__ = classmethod(types.GenericAlias)
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/spawn.py b/contrib/tools/python3/src/Lib/multiprocessing/spawn.py
index f7f8deb246..2ac0b653b6 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/spawn.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/spawn.py
@@ -96,28 +96,28 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
assert is_forking(sys.argv), "Not forking"
if sys.platform == 'win32':
import msvcrt
- import _winapi
-
- if parent_pid is not None:
- source_process = _winapi.OpenProcess(
- _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
- False, parent_pid)
- else:
- source_process = None
- new_handle = reduction.duplicate(pipe_handle,
- source_process=source_process)
+ import _winapi
+
+ if parent_pid is not None:
+ source_process = _winapi.OpenProcess(
+ _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
+ False, parent_pid)
+ else:
+ source_process = None
+ new_handle = reduction.duplicate(pipe_handle,
+ source_process=source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
- parent_sentinel = source_process
+ parent_sentinel = source_process
else:
- from . import resource_tracker
- resource_tracker._resource_tracker._fd = tracker_fd
+ from . import resource_tracker
+ resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
- parent_sentinel = os.dup(pipe_handle)
- exitcode = _main(fd, parent_sentinel)
+ parent_sentinel = os.dup(pipe_handle)
+ exitcode = _main(fd, parent_sentinel)
sys.exit(exitcode)
-def _main(fd, parent_sentinel):
+def _main(fd, parent_sentinel):
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
@@ -126,7 +126,7 @@ def _main(fd, parent_sentinel):
self = reduction.pickle.load(from_parent)
finally:
del process.current_process()._inheriting
- return self._bootstrap(parent_sentinel)
+ return self._bootstrap(parent_sentinel)
def _check_not_importing_main():
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py
index d0be48f1fd..881c823d28 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py
@@ -76,16 +76,16 @@ class SemLock(object):
# We only get here if we are on Unix with forking
# disabled. When the object is garbage collected or the
# process shuts down we unlink the semaphore name
- from .resource_tracker import register
- register(self._semlock.name, "semaphore")
+ from .resource_tracker import register
+ register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)
@staticmethod
def _cleanup(name):
- from .resource_tracker import unregister
+ from .resource_tracker import unregister
sem_unlink(name)
- unregister(name, "semaphore")
+ unregister(name, "semaphore")
def _make_methods(self):
self.acquire = self._semlock.acquire
@@ -270,7 +270,7 @@ class Condition(object):
def notify(self, n=1):
assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(
- False), ('notify: Should not have been able to acquire '
+ False), ('notify: Should not have been able to acquire '
+ '_wait_semaphore')
# to take account of timeouts since last notify*() we subtract
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/util.py b/contrib/tools/python3/src/Lib/multiprocessing/util.py
index e94466be8e..fed9ddf6ee 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/util.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/util.py
@@ -102,42 +102,42 @@ def log_to_stderr(level=None):
_log_to_stderr = True
return _logger
-
-# Abstract socket support
-
-def _platform_supports_abstract_sockets():
- if sys.platform == "linux":
- return True
- if hasattr(sys, 'getandroidapilevel'):
- return True
- return False
-
-
-def is_abstract_socket_namespace(address):
- if not address:
- return False
- if isinstance(address, bytes):
- return address[0] == 0
- elif isinstance(address, str):
- return address[0] == "\0"
- raise TypeError('address type of {address!r} unrecognized')
-
-
-abstract_sockets_supported = _platform_supports_abstract_sockets()
-
+
+# Abstract socket support
+
+def _platform_supports_abstract_sockets():
+ if sys.platform == "linux":
+ return True
+ if hasattr(sys, 'getandroidapilevel'):
+ return True
+ return False
+
+
+def is_abstract_socket_namespace(address):
+ if not address:
+ return False
+ if isinstance(address, bytes):
+ return address[0] == 0
+ elif isinstance(address, str):
+ return address[0] == "\0"
+ raise TypeError('address type of {address!r} unrecognized')
+
+
+abstract_sockets_supported = _platform_supports_abstract_sockets()
+
#
# Function returning a temp directory which will be removed on exit
#
-def _remove_temp_dir(rmtree, tempdir):
- rmtree(tempdir)
-
- current_process = process.current_process()
- # current_process() can be None if the finalizer is called
- # late during Python finalization
- if current_process is not None:
- current_process._config['tempdir'] = None
-
+def _remove_temp_dir(rmtree, tempdir):
+ rmtree(tempdir)
+
+ current_process = process.current_process()
+ # current_process() can be None if the finalizer is called
+ # late during Python finalization
+ if current_process is not None:
+ current_process._config['tempdir'] = None
+
def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up
tempdir = process.current_process()._config.get('tempdir')
@@ -145,10 +145,10 @@ def get_temp_dir():
import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir)
- # keep a strong reference to shutil.rmtree(), since the finalizer
- # can be called late during Python shutdown
- Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
- exitpriority=-100)
+ # keep a strong reference to shutil.rmtree(), since the finalizer
+ # can be called late during Python shutdown
+ Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
+ exitpriority=-100)
process.current_process()._config['tempdir'] = tempdir
return tempdir
@@ -261,7 +261,7 @@ class Finalize(object):
if self._kwargs:
x += ', kwargs=' + str(self._kwargs)
if self._key[0] is not None:
- x += ', exitpriority=' + str(self._key[0])
+ x += ', exitpriority=' + str(self._key[0])
return x + '>'
@@ -370,11 +370,11 @@ class ForkAwareThreadLock(object):
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
- register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
-
- def _at_fork_reinit(self):
- self._lock._at_fork_reinit()
+ register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
+ def _at_fork_reinit(self):
+ self._lock._at_fork_reinit()
+
def __enter__(self):
return self._lock.__enter__()
@@ -467,38 +467,38 @@ def spawnv_passfds(path, args, passfds):
return _posixsubprocess.fork_exec(
args, [os.fsencode(path)], True, passfds, None, _env_list(),
-1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
- False, False, None, None, None, -1, None)
+ False, False, None, None, None, -1, None)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
-
-
-def close_fds(*fds):
- """Close each file descriptor given as an argument"""
- for fd in fds:
- os.close(fd)
-
-
-def _cleanup_tests():
- """Cleanup multiprocessing resources when multiprocessing tests
- completed."""
-
- from test import support
-
- # cleanup multiprocessing
- process._cleanup()
-
- # Stop the ForkServer process if it's running
- from multiprocessing import forkserver
- forkserver._forkserver._stop()
-
- # Stop the ResourceTracker process if it's running
- from multiprocessing import resource_tracker
- resource_tracker._resource_tracker._stop()
-
- # bpo-37421: Explicitly call _run_finalizers() to remove immediately
- # temporary directories created by multiprocessing.util.get_temp_dir().
- _run_finalizers()
- support.gc_collect()
-
- support.reap_children()
+
+
+def close_fds(*fds):
+ """Close each file descriptor given as an argument"""
+ for fd in fds:
+ os.close(fd)
+
+
+def _cleanup_tests():
+ """Cleanup multiprocessing resources when multiprocessing tests
+ completed."""
+
+ from test import support
+
+ # cleanup multiprocessing
+ process._cleanup()
+
+ # Stop the ForkServer process if it's running
+ from multiprocessing import forkserver
+ forkserver._forkserver._stop()
+
+ # Stop the ResourceTracker process if it's running
+ from multiprocessing import resource_tracker
+ resource_tracker._resource_tracker._stop()
+
+ # bpo-37421: Explicitly call _run_finalizers() to remove immediately
+ # temporary directories created by multiprocessing.util.get_temp_dir().
+ _run_finalizers()
+ support.gc_collect()
+
+ support.reap_children()