diff options
| author | arcadia-devtools <[email protected]> | 2022-06-09 19:02:01 +0300 |
|---|---|---|
| committer | arcadia-devtools <[email protected]> | 2022-06-09 19:02:01 +0300 |
| commit | 4a29d649866ff133e0b8f8a1009e1000a44d7279 (patch) | |
| tree | 547229aded91b3760628c646a144af604f1c3e2b /contrib/tools/python3/src/Lib/concurrent | |
| parent | 782f2445a283aed9a66e699137b3349af1689c29 (diff) | |
intermediate changes
ref:478170c7a5a1c0788ddd0d6513ce4ed86d7d7c99
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent')
| -rw-r--r-- | contrib/tools/python3/src/Lib/concurrent/futures/_base.py | 2 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/concurrent/futures/process.py | 45 |
2 files changed, 37 insertions, 10 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py index 5c00f2edbe5..cf119ac6437 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py @@ -381,7 +381,7 @@ class Future(object): return self._state == RUNNING def done(self): - """Return True of the future was cancelled or finished executing.""" + """Return True if the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py index 6ee2ce626e4..57941e485d8 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py @@ -126,6 +126,9 @@ class _ExceptionWithTraceback: tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) self.exc = exc + # Traceback object needs to be garbage-collected as its frames + # contain references to all the objects in the exception scope + self.exc.__traceback__ = None self.tb = '\n"""\n%s"""' % tb def __reduce__(self): return _rebuild_exc, (self.exc, self.tb) @@ -612,6 +615,10 @@ class ProcessPoolExecutor(_base.Executor): mp_context = mp.get_context() self._mp_context = mp_context + # https://github.com/python/cpython/issues/90622 + self._safe_to_dynamically_spawn_children = ( + self._mp_context.get_start_method(allow_none=False) != "fork") + if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._initializer = initializer @@ -662,6 +669,8 @@ class ProcessPoolExecutor(_base.Executor): def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. + if not self._safe_to_dynamically_spawn_children: # ie, using fork. + self._launch_processes() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ @@ -674,14 +683,31 @@ class ProcessPoolExecutor(_base.Executor): process_count = len(self._processes) if process_count < self._max_workers: - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs)) - p.start() - self._processes[p.pid] = p + # Assertion disabled as this codepath is also used to replace a + # worker that unexpectedly dies, even when using the 'fork' start + # method. That means there is still a potential deadlock bug. If a + # 'fork' mp_context worker dies, we'll be forking a new one when + # we know a thread is running (self._executor_manager_thread). + #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self._spawn_process() + + def _launch_processes(self): + # https://github.com/python/cpython/issues/90622 + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result.') + for _ in range(len(self._processes), self._max_workers): + self._spawn_process() + + def _spawn_process(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs)) + p.start() + self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: @@ -702,7 +728,8 @@ class ProcessPoolExecutor(_base.Executor): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() + if self._safe_to_dynamically_spawn_children: + self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ |
