diff options
author | shadchin <shadchin@yandex-team.com> | 2024-12-23 19:39:02 +0300 |
---|---|---|
committer | shadchin <shadchin@yandex-team.com> | 2024-12-23 19:54:20 +0300 |
commit | 65a5bf9d37a3b29eb394f560b9a09318196c40e8 (patch) | |
tree | e5cd68fb0682b2388e52d9806bb87adc348e21a8 /contrib/tools/python3/Lib/concurrent/futures | |
parent | a1dd87a52878ab3e46e5fd2dba5ecbba6113d7e0 (diff) | |
download | ydb-65a5bf9d37a3b29eb394f560b9a09318196c40e8.tar.gz |
Update Python 3 to 3.12.8
commit_hash:c20045b8a987d8720e1f3328270357491d5530f3
Diffstat (limited to 'contrib/tools/python3/Lib/concurrent/futures')
-rw-r--r-- | contrib/tools/python3/Lib/concurrent/futures/process.py | 52 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/concurrent/futures/thread.py | 1 |
2 files changed, 23 insertions, 30 deletions
diff --git a/contrib/tools/python3/Lib/concurrent/futures/process.py b/contrib/tools/python3/Lib/concurrent/futures/process.py index 0e45288396..ff7c17efaa 100644 --- a/contrib/tools/python3/Lib/concurrent/futures/process.py +++ b/contrib/tools/python3/Lib/concurrent/futures/process.py @@ -68,27 +68,31 @@ _global_shutdown = False class _ThreadWakeup: def __init__(self): self._closed = False + self._lock = threading.Lock() self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - # Please note that we do not take the shutdown lock when + # Please note that we do not take the self._lock when # calling clear() (to avoid deadlocking) so this method can # only be called safely from the same thread as all calls to - # clear() even if you hold the shutdown lock. Otherwise we + # clear() even if you hold the lock. Otherwise we # might try to read from the closed pipe. - if not self._closed: - self._closed = True - self._writer.close() - self._reader.close() + with self._lock: + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - if not self._closed: - self._writer.send_bytes(b"") + with self._lock: + if not self._closed: + self._writer.send_bytes(b"") def clear(self): - if not self._closed: - while self._reader.poll(): - self._reader.recv_bytes() + if self._closed: + raise RuntimeError('operation on closed _ThreadWakeup') + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -167,10 +171,8 @@ class _CallItem(object): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -179,8 +181,7 @@ class _SafeQueue(Queue): tb = format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -305,12 +306,10 @@ class _ExecutorManagerThread(threading.Thread): # will wake up the queue management thread so that it can terminate # if there is no pending work item. def weakref_cb(_, - thread_wakeup=self.thread_wakeup, - shutdown_lock=self.shutdown_lock): + thread_wakeup=self.thread_wakeup): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') - with shutdown_lock: - thread_wakeup.wakeup() + thread_wakeup.wakeup() self.executor_reference = weakref.ref(executor, weakref_cb) @@ -438,11 +437,6 @@ class _ExecutorManagerThread(threading.Thread): elif wakeup_reader in ready: is_broken = False - # No need to hold the _shutdown_lock here because: - # 1. we're the only thread to use the wakeup reader - # 2. we're also the only thread to call thread_wakeup.close() - # 3. we want to avoid a possible deadlock when both reader and writer - # would block (gh-105829) self.thread_wakeup.clear() return result_item, is_broken, cause @@ -740,10 +734,9 @@ class ProcessPoolExecutor(_base.Executor): # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. # - # _shutdown_lock must be locked to access _ThreadWakeup.close() and - # .wakeup(). Care must also be taken to not call clear or close from - # more than one thread since _ThreadWakeup.clear() is not protected by - # the _shutdown_lock + # Care must be taken to only call clear and close from the + # executor_manager_thread, since _ThreadWakeup.clear() is not protected + # by a lock. self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor @@ -754,7 +747,6 @@ class ProcessPoolExecutor(_base.Executor): self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed diff --git a/contrib/tools/python3/Lib/concurrent/futures/thread.py b/contrib/tools/python3/Lib/concurrent/futures/thread.py index 3b3a36a509..61dbff8a48 100644 --- a/contrib/tools/python3/Lib/concurrent/futures/thread.py +++ b/contrib/tools/python3/Lib/concurrent/futures/thread.py @@ -41,6 +41,7 @@ if hasattr(os, 'register_at_fork'): os.register_at_fork(before=_global_shutdown_lock.acquire, after_in_child=_global_shutdown_lock._at_fork_reinit, after_in_parent=_global_shutdown_lock.release) + os.register_at_fork(after_in_child=_threads_queues.clear) class _WorkItem: |