aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/concurrent
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.com>2024-12-23 19:39:02 +0300
committershadchin <shadchin@yandex-team.com>2024-12-23 19:54:20 +0300
commit65a5bf9d37a3b29eb394f560b9a09318196c40e8 (patch)
treee5cd68fb0682b2388e52d9806bb87adc348e21a8 /contrib/tools/python3/Lib/concurrent
parenta1dd87a52878ab3e46e5fd2dba5ecbba6113d7e0 (diff)
downloadydb-65a5bf9d37a3b29eb394f560b9a09318196c40e8.tar.gz
Update Python 3 to 3.12.8
commit_hash:c20045b8a987d8720e1f3328270357491d5530f3
Diffstat (limited to 'contrib/tools/python3/Lib/concurrent')
-rw-r--r--contrib/tools/python3/Lib/concurrent/futures/process.py52
-rw-r--r--contrib/tools/python3/Lib/concurrent/futures/thread.py1
2 files changed, 23 insertions, 30 deletions
diff --git a/contrib/tools/python3/Lib/concurrent/futures/process.py b/contrib/tools/python3/Lib/concurrent/futures/process.py
index 0e45288396..ff7c17efaa 100644
--- a/contrib/tools/python3/Lib/concurrent/futures/process.py
+++ b/contrib/tools/python3/Lib/concurrent/futures/process.py
@@ -68,27 +68,31 @@ _global_shutdown = False
class _ThreadWakeup:
def __init__(self):
self._closed = False
+ self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
- # Please note that we do not take the shutdown lock when
+ # Please note that we do not take the self._lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
- # clear() even if you hold the shutdown lock. Otherwise we
+ # clear() even if you hold the lock. Otherwise we
# might try to read from the closed pipe.
- if not self._closed:
- self._closed = True
- self._writer.close()
- self._reader.close()
+ with self._lock:
+ if not self._closed:
+ self._closed = True
+ self._writer.close()
+ self._reader.close()
def wakeup(self):
- if not self._closed:
- self._writer.send_bytes(b"")
+ with self._lock:
+ if not self._closed:
+ self._writer.send_bytes(b"")
def clear(self):
- if not self._closed:
- while self._reader.poll():
- self._reader.recv_bytes()
+ if self._closed:
+ raise RuntimeError('operation on closed _ThreadWakeup')
+ while self._reader.poll():
+ self._reader.recv_bytes()
def _python_exit():
@@ -167,10 +171,8 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
- thread_wakeup):
+ def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
- self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
@@ -179,8 +181,7 @@ class _SafeQueue(Queue):
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
- with self.shutdown_lock:
- self.thread_wakeup.wakeup()
+ self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
@@ -305,12 +306,10 @@ class _ExecutorManagerThread(threading.Thread):
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
- thread_wakeup=self.thread_wakeup,
- shutdown_lock=self.shutdown_lock):
+ thread_wakeup=self.thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
- with shutdown_lock:
- thread_wakeup.wakeup()
+ thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
@@ -438,11 +437,6 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
- # No need to hold the _shutdown_lock here because:
- # 1. we're the only thread to use the wakeup reader
- # 2. we're also the only thread to call thread_wakeup.close()
- # 3. we want to avoid a possible deadlock when both reader and writer
- # would block (gh-105829)
self.thread_wakeup.clear()
return result_item, is_broken, cause
@@ -740,10 +734,9 @@ class ProcessPoolExecutor(_base.Executor):
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
- # _shutdown_lock must be locked to access _ThreadWakeup.close() and
- # .wakeup(). Care must also be taken to not call clear or close from
- # more than one thread since _ThreadWakeup.clear() is not protected by
- # the _shutdown_lock
+ # Care must be taken to only call clear and close from the
+ # executor_manager_thread, since _ThreadWakeup.clear() is not protected
+ # by a lock.
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
@@ -754,7 +747,6 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
- shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
diff --git a/contrib/tools/python3/Lib/concurrent/futures/thread.py b/contrib/tools/python3/Lib/concurrent/futures/thread.py
index 3b3a36a509..61dbff8a48 100644
--- a/contrib/tools/python3/Lib/concurrent/futures/thread.py
+++ b/contrib/tools/python3/Lib/concurrent/futures/thread.py
@@ -41,6 +41,7 @@ if hasattr(os, 'register_at_fork'):
os.register_at_fork(before=_global_shutdown_lock.acquire,
after_in_child=_global_shutdown_lock._at_fork_reinit,
after_in_parent=_global_shutdown_lock.release)
+ os.register_at_fork(after_in_child=_threads_queues.clear)
class _WorkItem: