diff options
author | shadchin <shadchin@yandex-team.com> | 2024-02-12 07:53:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@ydb.tech> | 2024-02-14 14:26:16 +0000 |
commit | 31f2a419764a8ba77c2a970cfc80056c6cd06756 (patch) | |
tree | c1995d239eba8571cefc640f6648e1d5dd4ce9e2 /contrib/tools/python3/src/Lib/concurrent/futures | |
parent | fe2ef02b38d9c85d80060963b265a1df9f38c3bb (diff) | |
download | ydb-31f2a419764a8ba77c2a970cfc80056c6cd06756.tar.gz |
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent/futures')
-rw-r--r-- | contrib/tools/python3/src/Lib/concurrent/futures/process.py | 62 | ||||
-rw-r--r-- | contrib/tools/python3/src/Lib/concurrent/futures/thread.py | 19 |
2 files changed, 53 insertions, 28 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py index 952fa90345..0e45288396 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py @@ -49,6 +49,8 @@ import os from concurrent.futures import _base import queue import multiprocessing as mp +# This import is required to load the multiprocessing.connection submodule +# so that it can be accessed later as `mp.connection` import multiprocessing.connection from multiprocessing.queues import Queue import threading @@ -339,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread): # Main loop for the executor manager thread. while True: - self.add_call_item_to_queue() + # gh-109047: During Python finalization, self.call_queue.put() + # creation of a thread can fail with RuntimeError. + try: + self.add_call_item_to_queue() + except BaseException as exc: + cause = format_exception(exc) + self.terminate_broken(cause) + return result_item, is_broken, cause = self.wait_result_broken_or_wakeup() @@ -423,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread): try: result_item = result_reader.recv() is_broken = False - except BaseException as e: - cause = format_exception(type(e), e, e.__traceback__) + except BaseException as exc: + cause = format_exception(exc) elif wakeup_reader in ready: is_broken = False @@ -471,7 +480,7 @@ class _ExecutorManagerThread(threading.Thread): return (_global_shutdown or executor is None or executor._shutdown_thread) - def terminate_broken(self, cause): + def _terminate_broken(self, cause): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that # lead the executor into becoming broken. @@ -496,7 +505,14 @@ class _ExecutorManagerThread(threading.Thread): # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): - work_item.future.set_exception(bpe) + try: + work_item.future.set_exception(bpe) + except _base.InvalidStateError: + # set_exception() fails if the future is cancelled: ignore it. + # Trying to check if the future is cancelled before calling + # set_exception() would leave a race condition if the future is + # cancelled between the check and set_exception(). + pass # Delete references to object. See issue16284 del work_item self.pending_work_items.clear() @@ -506,17 +522,14 @@ class _ExecutorManagerThread(threading.Thread): for p in self.processes.values(): p.terminate() - # Prevent queue writing to a pipe which is no longer read. - # https://github.com/python/cpython/issues/94777 - self.call_queue._reader.close() - - # gh-107219: Close the connection writer which can unblock - # Queue._feed() if it was stuck in send_bytes(). - if sys.platform == 'win32': - self.call_queue._writer.close() + self.call_queue._terminate_broken() # clean up resources - self.join_executor_internals() + self._join_executor_internals(broken=True) + + def terminate_broken(self, cause): + with self.shutdown_lock: + self._terminate_broken(cause) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -559,15 +572,24 @@ class _ExecutorManagerThread(threading.Thread): break def join_executor_internals(self): - self.shutdown_workers() + with self.shutdown_lock: + self._join_executor_internals() + + def _join_executor_internals(self, broken=False): + # If broken, call_queue was closed and so can no longer be used. + if not broken: + self.shutdown_workers() + # Release the queue's resources as soon as possible. self.call_queue.close() self.call_queue.join_thread() - with self.shutdown_lock: - self.thread_wakeup.close() + self.thread_wakeup.close() + # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in self.processes.values(): + if broken: + p.terminate() p.join() def get_n_children_alive(self): @@ -639,9 +661,9 @@ class ProcessPoolExecutor(_base.Executor): max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. - mp_context: A multiprocessing context to launch the workers. This - object should provide SimpleQueue, Queue and Process. Useful - to allow specific multiprocessing start methods. + mp_context: A multiprocessing context to launch the workers created + using the multiprocessing.get_context('start method') API. This + object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. max_tasks_per_child: The maximum number of tasks a worker process diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py index 51c942f51a..3b3a36a509 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py @@ -43,7 +43,7 @@ if hasattr(os, 'register_at_fork'): after_in_parent=_global_shutdown_lock.release) -class _WorkItem(object): +class _WorkItem: def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn @@ -78,17 +78,20 @@ def _worker(executor_reference, work_queue, initializer, initargs): return try: while True: - work_item = work_queue.get(block=True) - if work_item is not None: - work_item.run() - # Delete references to object. See issue16284 - del work_item - - # attempt to increment idle count + try: + work_item = work_queue.get_nowait() + except queue.Empty: + # attempt to increment idle count if queue is empty executor = executor_reference() if executor is not None: executor._idle_semaphore.release() del executor + work_item = work_queue.get(block=True) + + if work_item is not None: + work_item.run() + # Delete references to object. See GH-60488 + del work_item continue executor = executor_reference() |