aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/concurrent/futures/process.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/concurrent/futures/process.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent/futures/process.py')
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/process.py644
1 files changed, 322 insertions, 322 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
index a29e5247ab..fde626590c 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
@@ -3,7 +3,7 @@
"""Implements ProcessPoolExecutor.
-The following diagram and text describe the data-flow through the system:
+The following diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
@@ -49,13 +49,13 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
-import multiprocessing.connection
+import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
-import sys
+import sys
import traceback
@@ -65,23 +65,23 @@ _global_shutdown = False
class _ThreadWakeup:
def __init__(self):
- self._closed = False
+ self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
- if not self._closed:
- self._closed = True
- self._writer.close()
- self._reader.close()
+ if not self._closed:
+ self._closed = True
+ self._writer.close()
+ self._reader.close()
def wakeup(self):
- if not self._closed:
- self._writer.send_bytes(b"")
+ if not self._closed:
+ self._writer.send_bytes(b"")
def clear(self):
- if not self._closed:
- while self._reader.poll():
- self._reader.recv_bytes()
+ if not self._closed:
+ while self._reader.poll():
+ self._reader.recv_bytes()
def _python_exit():
@@ -89,17 +89,17 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
- # call not protected by ProcessPoolExecutor._shutdown_lock
+ # call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
-# Register for `_python_exit()` to be called just before joining all
-# non-daemon threads. This is used instead of `atexit.register()` for
-# compatibility with subinterpreters, which no longer support daemon threads.
-# See bpo-39812 for context.
-threading._register_atexit(_python_exit)
-
+# Register for `_python_exit()` to be called just before joining all
+# non-daemon threads. This is used instead of `atexit.register()` for
+# compatibility with subinterpreters, which no longer support daemon threads.
+# See bpo-39812 for context.
+threading._register_atexit(_python_exit)
+
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
@@ -107,12 +107,12 @@ threading._register_atexit(_python_exit)
EXTRA_QUEUED_CALLS = 1
-# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
-# It can wait on, at most, 63 objects. There is an overhead of two objects:
-# - the result queue reader
-# - the thread wakeup reader
-_MAX_WINDOWS_WORKERS = 63 - 2
-
+# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
+# It can wait on, at most, 63 objects. There is an overhead of two objects:
+# - the result queue reader
+# - the thread wakeup reader
+_MAX_WINDOWS_WORKERS = 63 - 2
+
# Hack to embed stringification of remote traceback in local traceback
class _RemoteTraceback(Exception):
@@ -157,11 +157,11 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
- thread_wakeup):
+ def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
+ thread_wakeup):
self.pending_work_items = pending_work_items
- self.shutdown_lock = shutdown_lock
- self.thread_wakeup = thread_wakeup
+ self.shutdown_lock = shutdown_lock
+ self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
@@ -169,11 +169,11 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
- with self.shutdown_lock:
- self.thread_wakeup.wakeup()
- # work_item can be None if another process terminated. In this
- # case, the executor_manager_thread fails all work_items
- # with BrokenProcessPool
+ with self.shutdown_lock:
+ self.thread_wakeup.wakeup()
+ # work_item can be None if another process terminated. In this
+ # case, the executor_manager_thread fails all work_items
+ # with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
@@ -189,7 +189,7 @@ def _get_chunks(*iterables, chunksize):
return
yield chunk
-
+
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
@@ -246,139 +246,139 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
_sendback_result(result_queue, call_item.work_id, result=r)
- del r
+ del r
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
-class _ExecutorManagerThread(threading.Thread):
- """Manages the communication between this process and the worker processes.
+class _ExecutorManagerThread(threading.Thread):
+ """Manages the communication between this process and the worker processes.
- The manager is run in a local thread.
+ The manager is run in a local thread.
Args:
- executor: A reference to the ProcessPoolExecutor that owns
- this thread. A weakref will be own by the manager as well as
- references to internal objects used to introspect the state of
- the executor.
+ executor: A reference to the ProcessPoolExecutor that owns
+ this thread. A weakref will be own by the manager as well as
+ references to internal objects used to introspect the state of
+ the executor.
"""
- def __init__(self, executor):
- # Store references to necessary internals of the executor.
-
- # A _ThreadWakeup to allow waking up the queue_manager_thread from the
- # main Thread and avoid deadlocks caused by permanently locked queues.
- self.thread_wakeup = executor._executor_manager_thread_wakeup
- self.shutdown_lock = executor._shutdown_lock
-
- # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
- # to determine if the ProcessPoolExecutor has been garbage collected
- # and that the manager can exit.
- # When the executor gets garbage collected, the weakref callback
- # will wake up the queue management thread so that it can terminate
- # if there is no pending work item.
- def weakref_cb(_,
- thread_wakeup=self.thread_wakeup,
- shutdown_lock=self.shutdown_lock):
- mp.util.debug('Executor collected: triggering callback for'
- ' QueueManager wakeup')
- with shutdown_lock:
- thread_wakeup.wakeup()
-
- self.executor_reference = weakref.ref(executor, weakref_cb)
-
- # A list of the ctx.Process instances used as workers.
- self.processes = executor._processes
-
- # A ctx.Queue that will be filled with _CallItems derived from
- # _WorkItems for processing by the process workers.
- self.call_queue = executor._call_queue
-
- # A ctx.SimpleQueue of _ResultItems generated by the process workers.
- self.result_queue = executor._result_queue
-
- # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
- self.work_ids_queue = executor._work_ids
-
- # A dict mapping work ids to _WorkItems e.g.
- # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
- self.pending_work_items = executor._pending_work_items
-
- super().__init__()
-
- def run(self):
- # Main loop for the executor manager thread.
-
- while True:
- self.add_call_item_to_queue()
-
- result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
-
- if is_broken:
- self.terminate_broken(cause)
- return
- if result_item is not None:
- self.process_result_item(result_item)
- # Delete reference to result_item to avoid keeping references
- # while waiting on new results.
- del result_item
-
- # attempt to increment idle process count
- executor = self.executor_reference()
- if executor is not None:
- executor._idle_worker_semaphore.release()
- del executor
-
- if self.is_shutting_down():
- self.flag_executor_shutting_down()
-
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not self.pending_work_items:
- self.join_executor_internals()
- return
-
- def add_call_item_to_queue(self):
- # Fills call_queue with _WorkItems from pending_work_items.
- # This function never blocks.
- while True:
- if self.call_queue.full():
- return
- try:
- work_id = self.work_ids_queue.get(block=False)
- except queue.Empty:
- return
- else:
- work_item = self.pending_work_items[work_id]
-
- if work_item.future.set_running_or_notify_cancel():
- self.call_queue.put(_CallItem(work_id,
- work_item.fn,
- work_item.args,
- work_item.kwargs),
- block=True)
- else:
- del self.pending_work_items[work_id]
- continue
-
- def wait_result_broken_or_wakeup(self):
+ def __init__(self, executor):
+ # Store references to necessary internals of the executor.
+
+ # A _ThreadWakeup to allow waking up the queue_manager_thread from the
+ # main Thread and avoid deadlocks caused by permanently locked queues.
+ self.thread_wakeup = executor._executor_manager_thread_wakeup
+ self.shutdown_lock = executor._shutdown_lock
+
+ # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
+ # to determine if the ProcessPoolExecutor has been garbage collected
+ # and that the manager can exit.
+ # When the executor gets garbage collected, the weakref callback
+ # will wake up the queue management thread so that it can terminate
+ # if there is no pending work item.
+ def weakref_cb(_,
+ thread_wakeup=self.thread_wakeup,
+ shutdown_lock=self.shutdown_lock):
+ mp.util.debug('Executor collected: triggering callback for'
+ ' QueueManager wakeup')
+ with shutdown_lock:
+ thread_wakeup.wakeup()
+
+ self.executor_reference = weakref.ref(executor, weakref_cb)
+
+ # A list of the ctx.Process instances used as workers.
+ self.processes = executor._processes
+
+ # A ctx.Queue that will be filled with _CallItems derived from
+ # _WorkItems for processing by the process workers.
+ self.call_queue = executor._call_queue
+
+ # A ctx.SimpleQueue of _ResultItems generated by the process workers.
+ self.result_queue = executor._result_queue
+
+ # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
+ self.work_ids_queue = executor._work_ids
+
+ # A dict mapping work ids to _WorkItems e.g.
+ # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
+ self.pending_work_items = executor._pending_work_items
+
+ super().__init__()
+
+ def run(self):
+ # Main loop for the executor manager thread.
+
+ while True:
+ self.add_call_item_to_queue()
+
+ result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
+
+ if is_broken:
+ self.terminate_broken(cause)
+ return
+ if result_item is not None:
+ self.process_result_item(result_item)
+ # Delete reference to result_item to avoid keeping references
+ # while waiting on new results.
+ del result_item
+
+ # attempt to increment idle process count
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._idle_worker_semaphore.release()
+ del executor
+
+ if self.is_shutting_down():
+ self.flag_executor_shutting_down()
+
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not self.pending_work_items:
+ self.join_executor_internals()
+ return
+
+ def add_call_item_to_queue(self):
+ # Fills call_queue with _WorkItems from pending_work_items.
+ # This function never blocks.
+ while True:
+ if self.call_queue.full():
+ return
+ try:
+ work_id = self.work_ids_queue.get(block=False)
+ except queue.Empty:
+ return
+ else:
+ work_item = self.pending_work_items[work_id]
+
+ if work_item.future.set_running_or_notify_cancel():
+ self.call_queue.put(_CallItem(work_id,
+ work_item.fn,
+ work_item.args,
+ work_item.kwargs),
+ block=True)
+ else:
+ del self.pending_work_items[work_id]
+ continue
+
+ def wait_result_broken_or_wakeup(self):
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
- result_reader = self.result_queue._reader
- assert not self.thread_wakeup._closed
- wakeup_reader = self.thread_wakeup._reader
- readers = [result_reader, wakeup_reader]
- worker_sentinels = [p.sentinel for p in list(self.processes.values())]
- ready = mp.connection.wait(readers + worker_sentinels)
+ result_reader = self.result_queue._reader
+ assert not self.thread_wakeup._closed
+ wakeup_reader = self.thread_wakeup._reader
+ readers = [result_reader, wakeup_reader]
+ worker_sentinels = [p.sentinel for p in list(self.processes.values())]
+ ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
- result_item = None
+ result_item = None
if result_reader in ready:
try:
result_item = result_reader.recv()
@@ -388,28 +388,28 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
-
- with self.shutdown_lock:
- self.thread_wakeup.clear()
-
- return result_item, is_broken, cause
-
- def process_result_item(self, result_item):
- # Process the received a result_item. This can be either the PID of a
- # worker that exited gracefully or a _ResultItem
-
+
+ with self.shutdown_lock:
+ self.thread_wakeup.clear()
+
+ return result_item, is_broken, cause
+
+ def process_result_item(self, result_item):
+ # Process the received a result_item. This can be either the PID of a
+ # worker that exited gracefully or a _ResultItem
+
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
- assert self.is_shutting_down()
- p = self.processes.pop(result_item)
+ assert self.is_shutting_down()
+ p = self.processes.pop(result_item)
p.join()
- if not self.processes:
- self.join_executor_internals()
+ if not self.processes:
+ self.join_executor_internals()
return
- else:
- # Received a _ResultItem so mark the future as completed.
- work_item = self.pending_work_items.pop(result_item.work_id, None)
+ else:
+ # Received a _ResultItem so mark the future as completed.
+ work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
@@ -417,111 +417,111 @@ class _ExecutorManagerThread(threading.Thread):
else:
work_item.future.set_result(result_item.result)
- def is_shutting_down(self):
- # Check whether we should start shutting down the executor.
- executor = self.executor_reference()
+ def is_shutting_down(self):
+ # Check whether we should start shutting down the executor.
+ executor = self.executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
- return (_global_shutdown or executor is None
- or executor._shutdown_thread)
-
- def terminate_broken(self, cause):
- # Terminate the executor because it is in a broken state. The cause
- # argument can be used to display more information on the error that
- # lead the executor into becoming broken.
-
- # Mark the process pool broken so that submits fail right now.
- executor = self.executor_reference()
- if executor is not None:
- executor._broken = ('A child process terminated '
- 'abruptly, the process pool is not '
- 'usable anymore')
- executor._shutdown_thread = True
- executor = None
-
- # All pending tasks are to be marked failed with the following
- # BrokenProcessPool error
- bpe = BrokenProcessPool("A process in the process pool was "
- "terminated abruptly while the future was "
- "running or pending.")
- if cause is not None:
- bpe.__cause__ = _RemoteTraceback(
- f"\n'''\n{''.join(cause)}'''")
-
- # Mark pending tasks as failed.
- for work_id, work_item in self.pending_work_items.items():
- work_item.future.set_exception(bpe)
- # Delete references to object. See issue16284
- del work_item
- self.pending_work_items.clear()
-
- # Terminate remaining workers forcibly: the queues or their
- # locks may be in a dirty state and block forever.
- for p in self.processes.values():
- p.terminate()
-
- # clean up resources
- self.join_executor_internals()
-
- def flag_executor_shutting_down(self):
- # Flag the executor as shutting down and cancel remaining tasks if
- # requested as early as possible if it is not gc-ed yet.
- executor = self.executor_reference()
- if executor is not None:
- executor._shutdown_thread = True
- # Cancel pending work items if requested.
- if executor._cancel_pending_futures:
- # Cancel all pending futures and update pending_work_items
- # to only have futures that are currently running.
- new_pending_work_items = {}
- for work_id, work_item in self.pending_work_items.items():
- if not work_item.future.cancel():
- new_pending_work_items[work_id] = work_item
- self.pending_work_items = new_pending_work_items
- # Drain work_ids_queue since we no longer need to
- # add items to the call queue.
- while True:
- try:
- self.work_ids_queue.get_nowait()
- except queue.Empty:
- break
- # Make sure we do this only once to not waste time looping
- # on running processes over and over.
- executor._cancel_pending_futures = False
-
- def shutdown_workers(self):
- n_children_to_stop = self.get_n_children_alive()
- n_sentinels_sent = 0
- # Send the right number of sentinels, to make sure all children are
- # properly terminated.
- while (n_sentinels_sent < n_children_to_stop
- and self.get_n_children_alive() > 0):
- for i in range(n_children_to_stop - n_sentinels_sent):
- try:
- self.call_queue.put_nowait(None)
- n_sentinels_sent += 1
- except queue.Full:
- break
-
- def join_executor_internals(self):
- self.shutdown_workers()
- # Release the queue's resources as soon as possible.
- self.call_queue.close()
- self.call_queue.join_thread()
- with self.shutdown_lock:
- self.thread_wakeup.close()
- # If .join() is not called on the created processes then
- # some ctx.Queue methods may deadlock on Mac OS X.
- for p in self.processes.values():
- p.join()
-
- def get_n_children_alive(self):
- # This is an upper bound on the number of children alive.
- return sum(p.is_alive() for p in self.processes.values())
-
-
+ return (_global_shutdown or executor is None
+ or executor._shutdown_thread)
+
+ def terminate_broken(self, cause):
+ # Terminate the executor because it is in a broken state. The cause
+ # argument can be used to display more information on the error that
+ # lead the executor into becoming broken.
+
+ # Mark the process pool broken so that submits fail right now.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._broken = ('A child process terminated '
+ 'abruptly, the process pool is not '
+ 'usable anymore')
+ executor._shutdown_thread = True
+ executor = None
+
+ # All pending tasks are to be marked failed with the following
+ # BrokenProcessPool error
+ bpe = BrokenProcessPool("A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending.")
+ if cause is not None:
+ bpe.__cause__ = _RemoteTraceback(
+ f"\n'''\n{''.join(cause)}'''")
+
+ # Mark pending tasks as failed.
+ for work_id, work_item in self.pending_work_items.items():
+ work_item.future.set_exception(bpe)
+ # Delete references to object. See issue16284
+ del work_item
+ self.pending_work_items.clear()
+
+ # Terminate remaining workers forcibly: the queues or their
+ # locks may be in a dirty state and block forever.
+ for p in self.processes.values():
+ p.terminate()
+
+ # clean up resources
+ self.join_executor_internals()
+
+ def flag_executor_shutting_down(self):
+ # Flag the executor as shutting down and cancel remaining tasks if
+ # requested as early as possible if it is not gc-ed yet.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._shutdown_thread = True
+ # Cancel pending work items if requested.
+ if executor._cancel_pending_futures:
+ # Cancel all pending futures and update pending_work_items
+ # to only have futures that are currently running.
+ new_pending_work_items = {}
+ for work_id, work_item in self.pending_work_items.items():
+ if not work_item.future.cancel():
+ new_pending_work_items[work_id] = work_item
+ self.pending_work_items = new_pending_work_items
+ # Drain work_ids_queue since we no longer need to
+ # add items to the call queue.
+ while True:
+ try:
+ self.work_ids_queue.get_nowait()
+ except queue.Empty:
+ break
+ # Make sure we do this only once to not waste time looping
+ # on running processes over and over.
+ executor._cancel_pending_futures = False
+
+ def shutdown_workers(self):
+ n_children_to_stop = self.get_n_children_alive()
+ n_sentinels_sent = 0
+ # Send the right number of sentinels, to make sure all children are
+ # properly terminated.
+ while (n_sentinels_sent < n_children_to_stop
+ and self.get_n_children_alive() > 0):
+ for i in range(n_children_to_stop - n_sentinels_sent):
+ try:
+ self.call_queue.put_nowait(None)
+ n_sentinels_sent += 1
+ except queue.Full:
+ break
+
+ def join_executor_internals(self):
+ self.shutdown_workers()
+ # Release the queue's resources as soon as possible.
+ self.call_queue.close()
+ self.call_queue.join_thread()
+ with self.shutdown_lock:
+ self.thread_wakeup.close()
+ # If .join() is not called on the created processes then
+ # some ctx.Queue methods may deadlock on Mac OS X.
+ for p in self.processes.values():
+ p.join()
+
+ def get_n_children_alive(self):
+ # This is an upper bound on the number of children alive.
+ return sum(p.is_alive() for p in self.processes.values())
+
+
_system_limits_checked = False
_system_limited = None
@@ -580,23 +580,23 @@ class ProcessPoolExecutor(_base.Executor):
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
- initializer: A callable used to initialize worker processes.
+ initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
- if sys.platform == 'win32':
- self._max_workers = min(_MAX_WINDOWS_WORKERS,
- self._max_workers)
+ if sys.platform == 'win32':
+ self._max_workers = min(_MAX_WINDOWS_WORKERS,
+ self._max_workers)
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
- elif (sys.platform == 'win32' and
- max_workers > _MAX_WINDOWS_WORKERS):
- raise ValueError(
- f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
+ elif (sys.platform == 'win32' and
+ max_workers > _MAX_WINDOWS_WORKERS):
+ raise ValueError(
+ f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
self._max_workers = max_workers
@@ -610,7 +610,7 @@ class ProcessPoolExecutor(_base.Executor):
self._initargs = initargs
# Management thread
- self._executor_manager_thread = None
+ self._executor_manager_thread = None
# Map of pids to processes
self._processes = {}
@@ -618,22 +618,22 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
- self._idle_worker_semaphore = threading.Semaphore(0)
+ self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
- self._cancel_pending_futures = False
-
- # _ThreadWakeup is a communication channel used to interrupt the wait
- # of the main loop of executor_manager_thread from another thread (e.g.
- # when calling executor.submit or executor.shutdown). We do not use the
- # _result_queue to send wakeup signals to the executor_manager_thread
- # as it could result in a deadlock if a worker process dies with the
- # _result_queue write lock still acquired.
- #
- # _shutdown_lock must be locked to access _ThreadWakeup.
- self._executor_manager_thread_wakeup = _ThreadWakeup()
-
+ self._cancel_pending_futures = False
+
+ # _ThreadWakeup is a communication channel used to interrupt the wait
+ # of the main loop of executor_manager_thread from another thread (e.g.
+ # when calling executor.submit or executor.shutdown). We do not use the
+ # _result_queue to send wakeup signals to the executor_manager_thread
+ # as it could result in a deadlock if a worker process dies with the
+ # _result_queue write lock still acquired.
+ #
+ # _shutdown_lock must be locked to access _ThreadWakeup.
+ self._executor_manager_thread_wakeup = _ThreadWakeup()
+
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
@@ -641,9 +641,9 @@ class ProcessPoolExecutor(_base.Executor):
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
- pending_work_items=self._pending_work_items,
- shutdown_lock=self._shutdown_lock,
- thread_wakeup=self._executor_manager_thread_wakeup)
+ pending_work_items=self._pending_work_items,
+ shutdown_lock=self._shutdown_lock,
+ thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
@@ -651,21 +651,21 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
- def _start_executor_manager_thread(self):
- if self._executor_manager_thread is None:
+ def _start_executor_manager_thread(self):
+ if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
- self._executor_manager_thread = _ExecutorManagerThread(self)
- self._executor_manager_thread.start()
- _threads_wakeups[self._executor_manager_thread] = \
- self._executor_manager_thread_wakeup
+ self._executor_manager_thread = _ExecutorManagerThread(self)
+ self._executor_manager_thread.start()
+ _threads_wakeups[self._executor_manager_thread] = \
+ self._executor_manager_thread_wakeup
def _adjust_process_count(self):
- # if there's an idle process, we don't need to spawn a new one.
- if self._idle_worker_semaphore.acquire(blocking=False):
- return
-
- process_count = len(self._processes)
- if process_count < self._max_workers:
+ # if there's an idle process, we don't need to spawn a new one.
+ if self._idle_worker_semaphore.acquire(blocking=False):
+ return
+
+ process_count = len(self._processes)
+ if process_count < self._max_workers:
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
@@ -675,7 +675,7 @@ class ProcessPoolExecutor(_base.Executor):
p.start()
self._processes[p.pid] = p
- def submit(self, fn, /, *args, **kwargs):
+ def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
@@ -692,10 +692,10 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
- self._executor_manager_thread_wakeup.wakeup()
+ self._executor_manager_thread_wakeup.wakeup()
- self._adjust_process_count()
- self._start_executor_manager_thread()
+ self._adjust_process_count()
+ self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
@@ -728,24 +728,24 @@ class ProcessPoolExecutor(_base.Executor):
timeout=timeout)
return _chain_from_iterable_of_lists(results)
- def shutdown(self, wait=True, *, cancel_futures=False):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
- self._cancel_pending_futures = cancel_futures
+ self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
- if self._executor_manager_thread_wakeup is not None:
- # Wake up queue management thread
- self._executor_manager_thread_wakeup.wakeup()
-
- if self._executor_manager_thread is not None and wait:
- self._executor_manager_thread.join()
+ if self._executor_manager_thread_wakeup is not None:
+ # Wake up queue management thread
+ self._executor_manager_thread_wakeup.wakeup()
+
+ if self._executor_manager_thread is not None and wait:
+ self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
- self._executor_manager_thread = None
- self._call_queue = None
- if self._result_queue is not None and wait:
- self._result_queue.close()
+ self._executor_manager_thread = None
+ self._call_queue = None
+ if self._result_queue is not None and wait:
+ self._result_queue.close()
self._result_queue = None
self._processes = None
- self._executor_manager_thread_wakeup = None
+ self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__