summaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/concurrent
diff options
context:
space:
mode:
authorarcadia-devtools <[email protected]>2022-06-09 19:02:01 +0300
committerarcadia-devtools <[email protected]>2022-06-09 19:02:01 +0300
commit4a29d649866ff133e0b8f8a1009e1000a44d7279 (patch)
tree547229aded91b3760628c646a144af604f1c3e2b /contrib/tools/python3/src/Lib/concurrent
parent782f2445a283aed9a66e699137b3349af1689c29 (diff)
intermediate changes
ref:478170c7a5a1c0788ddd0d6513ce4ed86d7d7c99
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent')
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/_base.py2
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/process.py45
2 files changed, 37 insertions, 10 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
index 5c00f2edbe5..cf119ac6437 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
@@ -381,7 +381,7 @@ class Future(object):
return self._state == RUNNING
def done(self):
- """Return True of the future was cancelled or finished executing."""
+ """Return True if the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
index 6ee2ce626e4..57941e485d8 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
@@ -126,6 +126,9 @@ class _ExceptionWithTraceback:
tb = traceback.format_exception(type(exc), exc, tb)
tb = ''.join(tb)
self.exc = exc
+ # Traceback object needs to be garbage-collected as its frames
+ # contain references to all the objects in the exception scope
+ self.exc.__traceback__ = None
self.tb = '\n"""\n%s"""' % tb
def __reduce__(self):
return _rebuild_exc, (self.exc, self.tb)
@@ -612,6 +615,10 @@ class ProcessPoolExecutor(_base.Executor):
mp_context = mp.get_context()
self._mp_context = mp_context
+ # https://github.com/python/cpython/issues/90622
+ self._safe_to_dynamically_spawn_children = (
+ self._mp_context.get_start_method(allow_none=False) != "fork")
+
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
@@ -662,6 +669,8 @@ class ProcessPoolExecutor(_base.Executor):
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
+ if not self._safe_to_dynamically_spawn_children: # ie, using fork.
+ self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
@@ -674,14 +683,31 @@ class ProcessPoolExecutor(_base.Executor):
process_count = len(self._processes)
if process_count < self._max_workers:
- p = self._mp_context.Process(
- target=_process_worker,
- args=(self._call_queue,
- self._result_queue,
- self._initializer,
- self._initargs))
- p.start()
- self._processes[p.pid] = p
+ # Assertion disabled as this codepath is also used to replace a
+ # worker that unexpectedly dies, even when using the 'fork' start
+ # method. That means there is still a potential deadlock bug. If a
+ # 'fork' mp_context worker dies, we'll be forking a new one when
+ # we know a thread is running (self._executor_manager_thread).
+ #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
+ self._spawn_process()
+
+ def _launch_processes(self):
+ # https://github.com/python/cpython/issues/90622
+ assert not self._executor_manager_thread, (
+ 'Processes cannot be fork()ed after the thread has started, '
+ 'deadlock in the child processes could result.')
+ for _ in range(len(self._processes), self._max_workers):
+ self._spawn_process()
+
+ def _spawn_process(self):
+ p = self._mp_context.Process(
+ target=_process_worker,
+ args=(self._call_queue,
+ self._result_queue,
+ self._initializer,
+ self._initargs))
+ p.start()
+ self._processes[p.pid] = p
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
@@ -702,7 +728,8 @@ class ProcessPoolExecutor(_base.Executor):
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
- self._adjust_process_count()
+ if self._safe_to_dynamically_spawn_children:
+ self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__