aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/concurrent
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:39 +0300
commite9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch)
tree64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/tools/python3/src/Lib/concurrent
parent2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff)
downloadydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent')
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/__init__.py2
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/_base.py100
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/process.py644
-rw-r--r--contrib/tools/python3/src/Lib/concurrent/futures/thread.py114
4 files changed, 430 insertions, 430 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py
index 07c638d39f..d746aeac50 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py
@@ -10,7 +10,7 @@ from concurrent.futures._base import (FIRST_COMPLETED,
ALL_COMPLETED,
CancelledError,
TimeoutError,
- InvalidStateError,
+ InvalidStateError,
BrokenExecutor,
Future,
Executor,
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
index 78d17dca4d..5c00f2edbe 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py
@@ -7,7 +7,7 @@ import collections
import logging
import threading
import time
-import types
+import types
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -54,10 +54,10 @@ class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
-class InvalidStateError(Error):
- """The operation is not allowed in this state."""
- pass
-
+class InvalidStateError(Error):
+ """The operation is not allowed in this state."""
+ pass
+
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
@@ -284,14 +284,14 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
- futures. Duplicate futures given to *fs* are removed and will be
- returned only once.
+ futures. Duplicate futures given to *fs* are removed and will be
+ returned only once.
"""
- fs = set(fs)
+ fs = set(fs)
with _AcquireFutures(fs):
- done = {f for f in fs
- if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
- not_done = fs - done
+ done = {f for f in fs
+ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
+ not_done = fs - done
if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
@@ -310,7 +310,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
f._waiters.remove(waiter)
done.update(waiter.finished_futures)
- return DoneAndNotDoneFutures(done, fs - done)
+ return DoneAndNotDoneFutures(done, fs - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
@@ -387,11 +387,11 @@ class Future(object):
def __get_result(self):
if self._exception:
- try:
- raise self._exception
- finally:
- # Break a reference cycle with the exception in self._exception
- self = None
+ try:
+ raise self._exception
+ finally:
+ # Break a reference cycle with the exception in self._exception
+ self = None
else:
return self._result
@@ -410,10 +410,10 @@ class Future(object):
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
- try:
- fn(self)
- except Exception:
- LOGGER.exception('exception calling callback for %r', self)
+ try:
+ fn(self)
+ except Exception:
+ LOGGER.exception('exception calling callback for %r', self)
def result(self, timeout=None):
"""Return the result of the call that the future represents.
@@ -431,24 +431,24 @@ class Future(object):
timeout.
Exception: If the call raised then that exception will be raised.
"""
- try:
- with self._condition:
- if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
- raise CancelledError()
- elif self._state == FINISHED:
- return self.__get_result()
-
- self._condition.wait(timeout)
-
- if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
- raise CancelledError()
- elif self._state == FINISHED:
- return self.__get_result()
- else:
- raise TimeoutError()
- finally:
- # Break a reference cycle with the exception in self._exception
- self = None
+ try:
+ with self._condition:
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+
+ self._condition.wait(timeout)
+
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+ else:
+ raise TimeoutError()
+ finally:
+ # Break a reference cycle with the exception in self._exception
+ self = None
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
@@ -530,8 +530,8 @@ class Future(object):
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
- if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
- raise InvalidStateError('{}: {!r}'.format(self._state, self))
+ if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
+ raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = FINISHED
for waiter in self._waiters:
@@ -545,8 +545,8 @@ class Future(object):
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
- if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
- raise InvalidStateError('{}: {!r}'.format(self._state, self))
+ if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
+ raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
@@ -554,12 +554,12 @@ class Future(object):
self._condition.notify_all()
self._invoke_callbacks()
- __class_getitem__ = classmethod(types.GenericAlias)
-
+ __class_getitem__ = classmethod(types.GenericAlias)
+
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
- def submit(self, fn, /, *args, **kwargs):
+ def submit(self, fn, /, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
@@ -614,7 +614,7 @@ class Executor(object):
future.cancel()
return result_iterator()
- def shutdown(self, wait=True, *, cancel_futures=False):
+ def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
@@ -624,9 +624,9 @@ class Executor(object):
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
- cancel_futures: If True then shutdown will cancel all pending
- futures. Futures that are completed or running will not be
- cancelled.
+ cancel_futures: If True then shutdown will cancel all pending
+ futures. Futures that are completed or running will not be
+ cancelled.
"""
pass
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
index fde626590c..a29e5247ab 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py
@@ -3,7 +3,7 @@
"""Implements ProcessPoolExecutor.
-The following diagram and text describe the data-flow through the system:
+The following diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
@@ -49,13 +49,13 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
-import multiprocessing.connection
+import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
-import sys
+import sys
import traceback
@@ -65,23 +65,23 @@ _global_shutdown = False
class _ThreadWakeup:
def __init__(self):
- self._closed = False
+ self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
- if not self._closed:
- self._closed = True
- self._writer.close()
- self._reader.close()
+ if not self._closed:
+ self._closed = True
+ self._writer.close()
+ self._reader.close()
def wakeup(self):
- if not self._closed:
- self._writer.send_bytes(b"")
+ if not self._closed:
+ self._writer.send_bytes(b"")
def clear(self):
- if not self._closed:
- while self._reader.poll():
- self._reader.recv_bytes()
+ if not self._closed:
+ while self._reader.poll():
+ self._reader.recv_bytes()
def _python_exit():
@@ -89,17 +89,17 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
- # call not protected by ProcessPoolExecutor._shutdown_lock
+ # call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
-# Register for `_python_exit()` to be called just before joining all
-# non-daemon threads. This is used instead of `atexit.register()` for
-# compatibility with subinterpreters, which no longer support daemon threads.
-# See bpo-39812 for context.
-threading._register_atexit(_python_exit)
-
+# Register for `_python_exit()` to be called just before joining all
+# non-daemon threads. This is used instead of `atexit.register()` for
+# compatibility with subinterpreters, which no longer support daemon threads.
+# See bpo-39812 for context.
+threading._register_atexit(_python_exit)
+
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
@@ -107,12 +107,12 @@ threading._register_atexit(_python_exit)
EXTRA_QUEUED_CALLS = 1
-# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
-# It can wait on, at most, 63 objects. There is an overhead of two objects:
-# - the result queue reader
-# - the thread wakeup reader
-_MAX_WINDOWS_WORKERS = 63 - 2
-
+# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
+# It can wait on, at most, 63 objects. There is an overhead of two objects:
+# - the result queue reader
+# - the thread wakeup reader
+_MAX_WINDOWS_WORKERS = 63 - 2
+
# Hack to embed stringification of remote traceback in local traceback
class _RemoteTraceback(Exception):
@@ -157,11 +157,11 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
- thread_wakeup):
+ def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
+ thread_wakeup):
self.pending_work_items = pending_work_items
- self.shutdown_lock = shutdown_lock
- self.thread_wakeup = thread_wakeup
+ self.shutdown_lock = shutdown_lock
+ self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
@@ -169,11 +169,11 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
- with self.shutdown_lock:
- self.thread_wakeup.wakeup()
- # work_item can be None if another process terminated. In this
- # case, the executor_manager_thread fails all work_items
- # with BrokenProcessPool
+ with self.shutdown_lock:
+ self.thread_wakeup.wakeup()
+ # work_item can be None if another process terminated. In this
+ # case, the executor_manager_thread fails all work_items
+ # with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
@@ -189,7 +189,7 @@ def _get_chunks(*iterables, chunksize):
return
yield chunk
-
+
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
@@ -246,139 +246,139 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
_sendback_result(result_queue, call_item.work_id, result=r)
- del r
+ del r
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
-class _ExecutorManagerThread(threading.Thread):
- """Manages the communication between this process and the worker processes.
+class _ExecutorManagerThread(threading.Thread):
+ """Manages the communication between this process and the worker processes.
- The manager is run in a local thread.
+ The manager is run in a local thread.
Args:
- executor: A reference to the ProcessPoolExecutor that owns
- this thread. A weakref will be own by the manager as well as
- references to internal objects used to introspect the state of
- the executor.
+ executor: A reference to the ProcessPoolExecutor that owns
+ this thread. A weakref will be own by the manager as well as
+ references to internal objects used to introspect the state of
+ the executor.
"""
- def __init__(self, executor):
- # Store references to necessary internals of the executor.
-
- # A _ThreadWakeup to allow waking up the queue_manager_thread from the
- # main Thread and avoid deadlocks caused by permanently locked queues.
- self.thread_wakeup = executor._executor_manager_thread_wakeup
- self.shutdown_lock = executor._shutdown_lock
-
- # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
- # to determine if the ProcessPoolExecutor has been garbage collected
- # and that the manager can exit.
- # When the executor gets garbage collected, the weakref callback
- # will wake up the queue management thread so that it can terminate
- # if there is no pending work item.
- def weakref_cb(_,
- thread_wakeup=self.thread_wakeup,
- shutdown_lock=self.shutdown_lock):
- mp.util.debug('Executor collected: triggering callback for'
- ' QueueManager wakeup')
- with shutdown_lock:
- thread_wakeup.wakeup()
-
- self.executor_reference = weakref.ref(executor, weakref_cb)
-
- # A list of the ctx.Process instances used as workers.
- self.processes = executor._processes
-
- # A ctx.Queue that will be filled with _CallItems derived from
- # _WorkItems for processing by the process workers.
- self.call_queue = executor._call_queue
-
- # A ctx.SimpleQueue of _ResultItems generated by the process workers.
- self.result_queue = executor._result_queue
-
- # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
- self.work_ids_queue = executor._work_ids
-
- # A dict mapping work ids to _WorkItems e.g.
- # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
- self.pending_work_items = executor._pending_work_items
-
- super().__init__()
-
- def run(self):
- # Main loop for the executor manager thread.
-
- while True:
- self.add_call_item_to_queue()
-
- result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
-
- if is_broken:
- self.terminate_broken(cause)
- return
- if result_item is not None:
- self.process_result_item(result_item)
- # Delete reference to result_item to avoid keeping references
- # while waiting on new results.
- del result_item
-
- # attempt to increment idle process count
- executor = self.executor_reference()
- if executor is not None:
- executor._idle_worker_semaphore.release()
- del executor
-
- if self.is_shutting_down():
- self.flag_executor_shutting_down()
-
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not self.pending_work_items:
- self.join_executor_internals()
- return
-
- def add_call_item_to_queue(self):
- # Fills call_queue with _WorkItems from pending_work_items.
- # This function never blocks.
- while True:
- if self.call_queue.full():
- return
- try:
- work_id = self.work_ids_queue.get(block=False)
- except queue.Empty:
- return
- else:
- work_item = self.pending_work_items[work_id]
-
- if work_item.future.set_running_or_notify_cancel():
- self.call_queue.put(_CallItem(work_id,
- work_item.fn,
- work_item.args,
- work_item.kwargs),
- block=True)
- else:
- del self.pending_work_items[work_id]
- continue
-
- def wait_result_broken_or_wakeup(self):
+ def __init__(self, executor):
+ # Store references to necessary internals of the executor.
+
+ # A _ThreadWakeup to allow waking up the queue_manager_thread from the
+ # main Thread and avoid deadlocks caused by permanently locked queues.
+ self.thread_wakeup = executor._executor_manager_thread_wakeup
+ self.shutdown_lock = executor._shutdown_lock
+
+ # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
+ # to determine if the ProcessPoolExecutor has been garbage collected
+ # and that the manager can exit.
+ # When the executor gets garbage collected, the weakref callback
+ # will wake up the queue management thread so that it can terminate
+ # if there is no pending work item.
+ def weakref_cb(_,
+ thread_wakeup=self.thread_wakeup,
+ shutdown_lock=self.shutdown_lock):
+ mp.util.debug('Executor collected: triggering callback for'
+ ' QueueManager wakeup')
+ with shutdown_lock:
+ thread_wakeup.wakeup()
+
+ self.executor_reference = weakref.ref(executor, weakref_cb)
+
+ # A list of the ctx.Process instances used as workers.
+ self.processes = executor._processes
+
+ # A ctx.Queue that will be filled with _CallItems derived from
+ # _WorkItems for processing by the process workers.
+ self.call_queue = executor._call_queue
+
+ # A ctx.SimpleQueue of _ResultItems generated by the process workers.
+ self.result_queue = executor._result_queue
+
+ # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
+ self.work_ids_queue = executor._work_ids
+
+ # A dict mapping work ids to _WorkItems e.g.
+ # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
+ self.pending_work_items = executor._pending_work_items
+
+ super().__init__()
+
+ def run(self):
+ # Main loop for the executor manager thread.
+
+ while True:
+ self.add_call_item_to_queue()
+
+ result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
+
+ if is_broken:
+ self.terminate_broken(cause)
+ return
+ if result_item is not None:
+ self.process_result_item(result_item)
+ # Delete reference to result_item to avoid keeping references
+ # while waiting on new results.
+ del result_item
+
+ # attempt to increment idle process count
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._idle_worker_semaphore.release()
+ del executor
+
+ if self.is_shutting_down():
+ self.flag_executor_shutting_down()
+
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not self.pending_work_items:
+ self.join_executor_internals()
+ return
+
+ def add_call_item_to_queue(self):
+ # Fills call_queue with _WorkItems from pending_work_items.
+ # This function never blocks.
+ while True:
+ if self.call_queue.full():
+ return
+ try:
+ work_id = self.work_ids_queue.get(block=False)
+ except queue.Empty:
+ return
+ else:
+ work_item = self.pending_work_items[work_id]
+
+ if work_item.future.set_running_or_notify_cancel():
+ self.call_queue.put(_CallItem(work_id,
+ work_item.fn,
+ work_item.args,
+ work_item.kwargs),
+ block=True)
+ else:
+ del self.pending_work_items[work_id]
+ continue
+
+ def wait_result_broken_or_wakeup(self):
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
- result_reader = self.result_queue._reader
- assert not self.thread_wakeup._closed
- wakeup_reader = self.thread_wakeup._reader
- readers = [result_reader, wakeup_reader]
- worker_sentinels = [p.sentinel for p in list(self.processes.values())]
- ready = mp.connection.wait(readers + worker_sentinels)
+ result_reader = self.result_queue._reader
+ assert not self.thread_wakeup._closed
+ wakeup_reader = self.thread_wakeup._reader
+ readers = [result_reader, wakeup_reader]
+ worker_sentinels = [p.sentinel for p in list(self.processes.values())]
+ ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
- result_item = None
+ result_item = None
if result_reader in ready:
try:
result_item = result_reader.recv()
@@ -388,28 +388,28 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
-
- with self.shutdown_lock:
- self.thread_wakeup.clear()
-
- return result_item, is_broken, cause
-
- def process_result_item(self, result_item):
- # Process the received a result_item. This can be either the PID of a
- # worker that exited gracefully or a _ResultItem
-
+
+ with self.shutdown_lock:
+ self.thread_wakeup.clear()
+
+ return result_item, is_broken, cause
+
+ def process_result_item(self, result_item):
+ # Process the received a result_item. This can be either the PID of a
+ # worker that exited gracefully or a _ResultItem
+
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
- assert self.is_shutting_down()
- p = self.processes.pop(result_item)
+ assert self.is_shutting_down()
+ p = self.processes.pop(result_item)
p.join()
- if not self.processes:
- self.join_executor_internals()
+ if not self.processes:
+ self.join_executor_internals()
return
- else:
- # Received a _ResultItem so mark the future as completed.
- work_item = self.pending_work_items.pop(result_item.work_id, None)
+ else:
+ # Received a _ResultItem so mark the future as completed.
+ work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
@@ -417,111 +417,111 @@ class _ExecutorManagerThread(threading.Thread):
else:
work_item.future.set_result(result_item.result)
- def is_shutting_down(self):
- # Check whether we should start shutting down the executor.
- executor = self.executor_reference()
+ def is_shutting_down(self):
+ # Check whether we should start shutting down the executor.
+ executor = self.executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
- return (_global_shutdown or executor is None
- or executor._shutdown_thread)
-
- def terminate_broken(self, cause):
- # Terminate the executor because it is in a broken state. The cause
- # argument can be used to display more information on the error that
- # lead the executor into becoming broken.
-
- # Mark the process pool broken so that submits fail right now.
- executor = self.executor_reference()
- if executor is not None:
- executor._broken = ('A child process terminated '
- 'abruptly, the process pool is not '
- 'usable anymore')
- executor._shutdown_thread = True
- executor = None
-
- # All pending tasks are to be marked failed with the following
- # BrokenProcessPool error
- bpe = BrokenProcessPool("A process in the process pool was "
- "terminated abruptly while the future was "
- "running or pending.")
- if cause is not None:
- bpe.__cause__ = _RemoteTraceback(
- f"\n'''\n{''.join(cause)}'''")
-
- # Mark pending tasks as failed.
- for work_id, work_item in self.pending_work_items.items():
- work_item.future.set_exception(bpe)
- # Delete references to object. See issue16284
- del work_item
- self.pending_work_items.clear()
-
- # Terminate remaining workers forcibly: the queues or their
- # locks may be in a dirty state and block forever.
- for p in self.processes.values():
- p.terminate()
-
- # clean up resources
- self.join_executor_internals()
-
- def flag_executor_shutting_down(self):
- # Flag the executor as shutting down and cancel remaining tasks if
- # requested as early as possible if it is not gc-ed yet.
- executor = self.executor_reference()
- if executor is not None:
- executor._shutdown_thread = True
- # Cancel pending work items if requested.
- if executor._cancel_pending_futures:
- # Cancel all pending futures and update pending_work_items
- # to only have futures that are currently running.
- new_pending_work_items = {}
- for work_id, work_item in self.pending_work_items.items():
- if not work_item.future.cancel():
- new_pending_work_items[work_id] = work_item
- self.pending_work_items = new_pending_work_items
- # Drain work_ids_queue since we no longer need to
- # add items to the call queue.
- while True:
- try:
- self.work_ids_queue.get_nowait()
- except queue.Empty:
- break
- # Make sure we do this only once to not waste time looping
- # on running processes over and over.
- executor._cancel_pending_futures = False
-
- def shutdown_workers(self):
- n_children_to_stop = self.get_n_children_alive()
- n_sentinels_sent = 0
- # Send the right number of sentinels, to make sure all children are
- # properly terminated.
- while (n_sentinels_sent < n_children_to_stop
- and self.get_n_children_alive() > 0):
- for i in range(n_children_to_stop - n_sentinels_sent):
- try:
- self.call_queue.put_nowait(None)
- n_sentinels_sent += 1
- except queue.Full:
- break
-
- def join_executor_internals(self):
- self.shutdown_workers()
- # Release the queue's resources as soon as possible.
- self.call_queue.close()
- self.call_queue.join_thread()
- with self.shutdown_lock:
- self.thread_wakeup.close()
- # If .join() is not called on the created processes then
- # some ctx.Queue methods may deadlock on Mac OS X.
- for p in self.processes.values():
- p.join()
-
- def get_n_children_alive(self):
- # This is an upper bound on the number of children alive.
- return sum(p.is_alive() for p in self.processes.values())
-
-
+ return (_global_shutdown or executor is None
+ or executor._shutdown_thread)
+
+ def terminate_broken(self, cause):
+ # Terminate the executor because it is in a broken state. The cause
+ # argument can be used to display more information on the error that
+ # lead the executor into becoming broken.
+
+ # Mark the process pool broken so that submits fail right now.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._broken = ('A child process terminated '
+ 'abruptly, the process pool is not '
+ 'usable anymore')
+ executor._shutdown_thread = True
+ executor = None
+
+ # All pending tasks are to be marked failed with the following
+ # BrokenProcessPool error
+ bpe = BrokenProcessPool("A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending.")
+ if cause is not None:
+ bpe.__cause__ = _RemoteTraceback(
+ f"\n'''\n{''.join(cause)}'''")
+
+ # Mark pending tasks as failed.
+ for work_id, work_item in self.pending_work_items.items():
+ work_item.future.set_exception(bpe)
+ # Delete references to object. See issue16284
+ del work_item
+ self.pending_work_items.clear()
+
+ # Terminate remaining workers forcibly: the queues or their
+ # locks may be in a dirty state and block forever.
+ for p in self.processes.values():
+ p.terminate()
+
+ # clean up resources
+ self.join_executor_internals()
+
+ def flag_executor_shutting_down(self):
+ # Flag the executor as shutting down and cancel remaining tasks if
+ # requested as early as possible if it is not gc-ed yet.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._shutdown_thread = True
+ # Cancel pending work items if requested.
+ if executor._cancel_pending_futures:
+ # Cancel all pending futures and update pending_work_items
+ # to only have futures that are currently running.
+ new_pending_work_items = {}
+ for work_id, work_item in self.pending_work_items.items():
+ if not work_item.future.cancel():
+ new_pending_work_items[work_id] = work_item
+ self.pending_work_items = new_pending_work_items
+ # Drain work_ids_queue since we no longer need to
+ # add items to the call queue.
+ while True:
+ try:
+ self.work_ids_queue.get_nowait()
+ except queue.Empty:
+ break
+ # Make sure we do this only once to not waste time looping
+ # on running processes over and over.
+ executor._cancel_pending_futures = False
+
+ def shutdown_workers(self):
+ n_children_to_stop = self.get_n_children_alive()
+ n_sentinels_sent = 0
+ # Send the right number of sentinels, to make sure all children are
+ # properly terminated.
+ while (n_sentinels_sent < n_children_to_stop
+ and self.get_n_children_alive() > 0):
+ for i in range(n_children_to_stop - n_sentinels_sent):
+ try:
+ self.call_queue.put_nowait(None)
+ n_sentinels_sent += 1
+ except queue.Full:
+ break
+
+ def join_executor_internals(self):
+ self.shutdown_workers()
+ # Release the queue's resources as soon as possible.
+ self.call_queue.close()
+ self.call_queue.join_thread()
+ with self.shutdown_lock:
+ self.thread_wakeup.close()
+ # If .join() is not called on the created processes then
+ # some ctx.Queue methods may deadlock on Mac OS X.
+ for p in self.processes.values():
+ p.join()
+
+ def get_n_children_alive(self):
+ # This is an upper bound on the number of children alive.
+ return sum(p.is_alive() for p in self.processes.values())
+
+
_system_limits_checked = False
_system_limited = None
@@ -580,23 +580,23 @@ class ProcessPoolExecutor(_base.Executor):
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
- initializer: A callable used to initialize worker processes.
+ initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
- if sys.platform == 'win32':
- self._max_workers = min(_MAX_WINDOWS_WORKERS,
- self._max_workers)
+ if sys.platform == 'win32':
+ self._max_workers = min(_MAX_WINDOWS_WORKERS,
+ self._max_workers)
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
- elif (sys.platform == 'win32' and
- max_workers > _MAX_WINDOWS_WORKERS):
- raise ValueError(
- f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
+ elif (sys.platform == 'win32' and
+ max_workers > _MAX_WINDOWS_WORKERS):
+ raise ValueError(
+ f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
self._max_workers = max_workers
@@ -610,7 +610,7 @@ class ProcessPoolExecutor(_base.Executor):
self._initargs = initargs
# Management thread
- self._executor_manager_thread = None
+ self._executor_manager_thread = None
# Map of pids to processes
self._processes = {}
@@ -618,22 +618,22 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
- self._idle_worker_semaphore = threading.Semaphore(0)
+ self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
- self._cancel_pending_futures = False
-
- # _ThreadWakeup is a communication channel used to interrupt the wait
- # of the main loop of executor_manager_thread from another thread (e.g.
- # when calling executor.submit or executor.shutdown). We do not use the
- # _result_queue to send wakeup signals to the executor_manager_thread
- # as it could result in a deadlock if a worker process dies with the
- # _result_queue write lock still acquired.
- #
- # _shutdown_lock must be locked to access _ThreadWakeup.
- self._executor_manager_thread_wakeup = _ThreadWakeup()
-
+ self._cancel_pending_futures = False
+
+ # _ThreadWakeup is a communication channel used to interrupt the wait
+ # of the main loop of executor_manager_thread from another thread (e.g.
+ # when calling executor.submit or executor.shutdown). We do not use the
+ # _result_queue to send wakeup signals to the executor_manager_thread
+ # as it could result in a deadlock if a worker process dies with the
+ # _result_queue write lock still acquired.
+ #
+ # _shutdown_lock must be locked to access _ThreadWakeup.
+ self._executor_manager_thread_wakeup = _ThreadWakeup()
+
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
@@ -641,9 +641,9 @@ class ProcessPoolExecutor(_base.Executor):
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
- pending_work_items=self._pending_work_items,
- shutdown_lock=self._shutdown_lock,
- thread_wakeup=self._executor_manager_thread_wakeup)
+ pending_work_items=self._pending_work_items,
+ shutdown_lock=self._shutdown_lock,
+ thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
@@ -651,21 +651,21 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
- def _start_executor_manager_thread(self):
- if self._executor_manager_thread is None:
+ def _start_executor_manager_thread(self):
+ if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
- self._executor_manager_thread = _ExecutorManagerThread(self)
- self._executor_manager_thread.start()
- _threads_wakeups[self._executor_manager_thread] = \
- self._executor_manager_thread_wakeup
+ self._executor_manager_thread = _ExecutorManagerThread(self)
+ self._executor_manager_thread.start()
+ _threads_wakeups[self._executor_manager_thread] = \
+ self._executor_manager_thread_wakeup
def _adjust_process_count(self):
- # if there's an idle process, we don't need to spawn a new one.
- if self._idle_worker_semaphore.acquire(blocking=False):
- return
-
- process_count = len(self._processes)
- if process_count < self._max_workers:
+ # if there's an idle process, we don't need to spawn a new one.
+ if self._idle_worker_semaphore.acquire(blocking=False):
+ return
+
+ process_count = len(self._processes)
+ if process_count < self._max_workers:
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
@@ -675,7 +675,7 @@ class ProcessPoolExecutor(_base.Executor):
p.start()
self._processes[p.pid] = p
- def submit(self, fn, /, *args, **kwargs):
+ def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
@@ -692,10 +692,10 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
- self._executor_manager_thread_wakeup.wakeup()
+ self._executor_manager_thread_wakeup.wakeup()
- self._adjust_process_count()
- self._start_executor_manager_thread()
+ self._adjust_process_count()
+ self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
@@ -728,24 +728,24 @@ class ProcessPoolExecutor(_base.Executor):
timeout=timeout)
return _chain_from_iterable_of_lists(results)
- def shutdown(self, wait=True, *, cancel_futures=False):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
- self._cancel_pending_futures = cancel_futures
+ self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
- if self._executor_manager_thread_wakeup is not None:
- # Wake up queue management thread
- self._executor_manager_thread_wakeup.wakeup()
-
- if self._executor_manager_thread is not None and wait:
- self._executor_manager_thread.join()
+ if self._executor_manager_thread_wakeup is not None:
+ # Wake up queue management thread
+ self._executor_manager_thread_wakeup.wakeup()
+
+ if self._executor_manager_thread is not None and wait:
+ self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
- self._executor_manager_thread = None
- self._call_queue = None
- if self._result_queue is not None and wait:
- self._result_queue.close()
+ self._executor_manager_thread = None
+ self._call_queue = None
+ if self._result_queue is not None and wait:
+ self._result_queue.close()
self._result_queue = None
self._processes = None
- self._executor_manager_thread_wakeup = None
+ self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py
index 0ed4a2c091..51c942f51a 100644
--- a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py
+++ b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py
@@ -9,40 +9,40 @@ from concurrent.futures import _base
import itertools
import queue
import threading
-import types
+import types
import weakref
import os
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
-# Lock that ensures that new workers are not created while the interpreter is
-# shutting down. Must be held while mutating _threads_queues and _shutdown.
-_global_shutdown_lock = threading.Lock()
+# Lock that ensures that new workers are not created while the interpreter is
+# shutting down. Must be held while mutating _threads_queues and _shutdown.
+_global_shutdown_lock = threading.Lock()
def _python_exit():
global _shutdown
- with _global_shutdown_lock:
- _shutdown = True
+ with _global_shutdown_lock:
+ _shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
-# Register for `_python_exit()` to be called just before joining all
-# non-daemon threads. This is used instead of `atexit.register()` for
-# compatibility with subinterpreters, which no longer support daemon threads.
-# See bpo-39812 for context.
-threading._register_atexit(_python_exit)
+# Register for `_python_exit()` to be called just before joining all
+# non-daemon threads. This is used instead of `atexit.register()` for
+# compatibility with subinterpreters, which no longer support daemon threads.
+# See bpo-39812 for context.
+threading._register_atexit(_python_exit)
+
+# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
+if hasattr(os, 'register_at_fork'):
+ os.register_at_fork(before=_global_shutdown_lock.acquire,
+ after_in_child=_global_shutdown_lock._at_fork_reinit,
+ after_in_parent=_global_shutdown_lock.release)
-# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
-if hasattr(os, 'register_at_fork'):
- os.register_at_fork(before=_global_shutdown_lock.acquire,
- after_in_child=_global_shutdown_lock._at_fork_reinit,
- after_in_parent=_global_shutdown_lock.release)
-
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
@@ -63,9 +63,9 @@ class _WorkItem(object):
else:
self.future.set_result(result)
- __class_getitem__ = classmethod(types.GenericAlias)
+ __class_getitem__ = classmethod(types.GenericAlias)
+
-
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
@@ -83,14 +83,14 @@ def _worker(executor_reference, work_queue, initializer, initargs):
work_item.run()
# Delete references to object. See issue16284
del work_item
-
- # attempt to increment idle count
- executor = executor_reference()
- if executor is not None:
- executor._idle_semaphore.release()
- del executor
+
+ # attempt to increment idle count
+ executor = executor_reference()
+ if executor is not None:
+ executor._idle_semaphore.release()
+ del executor
continue
-
+
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
@@ -128,18 +128,18 @@ class ThreadPoolExecutor(_base.Executor):
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
- initializer: A callable used to initialize worker threads.
+ initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
- # ThreadPoolExecutor is often used to:
- # * CPU bound task which releases GIL
- # * I/O bound task (which releases GIL, of course)
- #
- # We use cpu_count + 4 for both types of tasks.
- # But we limit it to 32 to avoid consuming surprisingly large resource
- # on many core machine.
- max_workers = min(32, (os.cpu_count() or 1) + 4)
+ # ThreadPoolExecutor is often used to:
+ # * CPU bound task which releases GIL
+ # * I/O bound task (which releases GIL, of course)
+ #
+ # We use cpu_count + 4 for both types of tasks.
+ # But we limit it to 32 to avoid consuming surprisingly large resource
+ # on many core machine.
+ max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
@@ -148,7 +148,7 @@ class ThreadPoolExecutor(_base.Executor):
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
- self._idle_semaphore = threading.Semaphore(0)
+ self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
@@ -158,8 +158,8 @@ class ThreadPoolExecutor(_base.Executor):
self._initializer = initializer
self._initargs = initargs
- def submit(self, fn, /, *args, **kwargs):
- with self._shutdown_lock, _global_shutdown_lock:
+ def submit(self, fn, /, *args, **kwargs):
+ with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
@@ -178,15 +178,15 @@ class ThreadPoolExecutor(_base.Executor):
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
- # if idle threads are available, don't spin new threads
- if self._idle_semaphore.acquire(timeout=0):
- return
-
+ # if idle threads are available, don't spin new threads
+ if self._idle_semaphore.acquire(timeout=0):
+ return
+
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
-
+
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
@@ -213,22 +213,22 @@ class ThreadPoolExecutor(_base.Executor):
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))
- def shutdown(self, wait=True, *, cancel_futures=False):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
- if cancel_futures:
- # Drain all work items from the queue, and then cancel their
- # associated futures.
- while True:
- try:
- work_item = self._work_queue.get_nowait()
- except queue.Empty:
- break
- if work_item is not None:
- work_item.future.cancel()
-
- # Send a wake-up to prevent threads calling
- # _work_queue.get(block=True) from permanently blocking.
+ if cancel_futures:
+ # Drain all work items from the queue, and then cancel their
+ # associated futures.
+ while True:
+ try:
+ work_item = self._work_queue.get_nowait()
+ except queue.Empty:
+ break
+ if work_item is not None:
+ work_item.future.cancel()
+
+ # Send a wake-up to prevent threads calling
+ # _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads: