diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/concurrent/futures | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent/futures')
4 files changed, 430 insertions, 430 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py index d746aeac50..07c638d39f 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py @@ -10,7 +10,7 @@ from concurrent.futures._base import (FIRST_COMPLETED, ALL_COMPLETED, CancelledError, TimeoutError, - InvalidStateError, + InvalidStateError, BrokenExecutor, Future, Executor, diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py index 5c00f2edbe..78d17dca4d 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py @@ -7,7 +7,7 @@ import collections import logging import threading import time -import types +import types FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -54,10 +54,10 @@ class TimeoutError(Error): """The operation exceeded the given deadline.""" pass -class InvalidStateError(Error): - """The operation is not allowed in this state.""" - pass - +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + pass + class _Waiter(object): """Provides the event that wait() and as_completed() block on.""" def __init__(self): @@ -284,14 +284,14 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): A named 2-tuple of sets. The first set, named 'done', contains the futures that completed (is finished or cancelled) before the wait completed. The second set, named 'not_done', contains uncompleted - futures. Duplicate futures given to *fs* are removed and will be - returned only once. + futures. Duplicate futures given to *fs* are removed and will be + returned only once. """ - fs = set(fs) + fs = set(fs) with _AcquireFutures(fs): - done = {f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} - not_done = fs - done + done = {f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} + not_done = fs - done if (return_when == FIRST_COMPLETED) and done: return DoneAndNotDoneFutures(done, not_done) elif (return_when == FIRST_EXCEPTION) and done: @@ -310,7 +310,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): f._waiters.remove(waiter) done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, fs - done) + return DoneAndNotDoneFutures(done, fs - done) class Future(object): """Represents the result of an asynchronous computation.""" @@ -387,11 +387,11 @@ class Future(object): def __get_result(self): if self._exception: - try: - raise self._exception - finally: - # Break a reference cycle with the exception in self._exception - self = None + try: + raise self._exception + finally: + # Break a reference cycle with the exception in self._exception + self = None else: return self._result @@ -410,10 +410,10 @@ class Future(object): if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: self._done_callbacks.append(fn) return - try: - fn(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) + try: + fn(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def result(self, timeout=None): """Return the result of the call that the future represents. @@ -431,24 +431,24 @@ class Future(object): timeout. Exception: If the call raised then that exception will be raised. """ - try: - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - finally: - # Break a reference cycle with the exception in self._exception - self = None + try: + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + finally: + # Break a reference cycle with the exception in self._exception + self = None def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -530,8 +530,8 @@ class Future(object): Should only be used by Executor implementations and unit tests. """ with self._condition: - if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: - raise InvalidStateError('{}: {!r}'.format(self._state, self)) + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = FINISHED for waiter in self._waiters: @@ -545,8 +545,8 @@ class Future(object): Should only be used by Executor implementations and unit tests. """ with self._condition: - if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: - raise InvalidStateError('{}: {!r}'.format(self._state, self)) + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._exception = exception self._state = FINISHED for waiter in self._waiters: @@ -554,12 +554,12 @@ class Future(object): self._condition.notify_all() self._invoke_callbacks() - __class_getitem__ = classmethod(types.GenericAlias) - + __class_getitem__ = classmethod(types.GenericAlias) + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" - def submit(self, fn, /, *args, **kwargs): + def submit(self, fn, /, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -614,7 +614,7 @@ class Executor(object): future.cancel() return result_iterator() - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait=True, *, cancel_futures=False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -624,9 +624,9 @@ class Executor(object): wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. """ pass diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py index a29e5247ab..fde626590c 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/process.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py @@ -3,7 +3,7 @@ """Implements ProcessPoolExecutor. -The following diagram and text describe the data-flow through the system: +The following diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| @@ -49,13 +49,13 @@ import os from concurrent.futures import _base import queue import multiprocessing as mp -import multiprocessing.connection +import multiprocessing.connection from multiprocessing.queues import Queue import threading import weakref from functools import partial import itertools -import sys +import sys import traceback @@ -65,23 +65,23 @@ _global_shutdown = False class _ThreadWakeup: def __init__(self): - self._closed = False + self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - if not self._closed: - self._closed = True - self._writer.close() - self._reader.close() + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - if not self._closed: - self._writer.send_bytes(b"") + if not self._closed: + self._writer.send_bytes(b"") def clear(self): - if not self._closed: - while self._reader.poll(): - self._reader.recv_bytes() + if not self._closed: + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -89,17 +89,17 @@ def _python_exit(): _global_shutdown = True items = list(_threads_wakeups.items()) for _, thread_wakeup in items: - # call not protected by ProcessPoolExecutor._shutdown_lock + # call not protected by ProcessPoolExecutor._shutdown_lock thread_wakeup.wakeup() for t, _ in items: t.join() -# Register for `_python_exit()` to be called just before joining all -# non-daemon threads. This is used instead of `atexit.register()` for -# compatibility with subinterpreters, which no longer support daemon threads. -# See bpo-39812 for context. -threading._register_atexit(_python_exit) - +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -107,12 +107,12 @@ threading._register_atexit(_python_exit) EXTRA_QUEUED_CALLS = 1 -# On Windows, WaitForMultipleObjects is used to wait for processes to finish. -# It can wait on, at most, 63 objects. There is an overhead of two objects: -# - the result queue reader -# - the thread wakeup reader -_MAX_WINDOWS_WORKERS = 63 - 2 - +# On Windows, WaitForMultipleObjects is used to wait for processes to finish. +# It can wait on, at most, 63 objects. There is an overhead of two objects: +# - the result queue reader +# - the thread wakeup reader +_MAX_WINDOWS_WORKERS = 63 - 2 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -157,11 +157,11 @@ class _CallItem(object): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, + thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock - self.thread_wakeup = thread_wakeup + self.shutdown_lock = shutdown_lock + self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) def _on_queue_feeder_error(self, e, obj): @@ -169,11 +169,11 @@ class _SafeQueue(Queue): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() - # work_item can be None if another process terminated. In this - # case, the executor_manager_thread fails all work_items - # with BrokenProcessPool + with self.shutdown_lock: + self.thread_wakeup.wakeup() + # work_item can be None if another process terminated. In this + # case, the executor_manager_thread fails all work_items + # with BrokenProcessPool if work_item is not None: work_item.future.set_exception(e) else: @@ -189,7 +189,7 @@ def _get_chunks(*iterables, chunksize): return yield chunk - + def _process_chunk(fn, chunk): """ Processes a chunk of an iterable passed to map. @@ -246,139 +246,139 @@ def _process_worker(call_queue, result_queue, initializer, initargs): _sendback_result(result_queue, call_item.work_id, exception=exc) else: _sendback_result(result_queue, call_item.work_id, result=r) - del r + del r # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item -class _ExecutorManagerThread(threading.Thread): - """Manages the communication between this process and the worker processes. +class _ExecutorManagerThread(threading.Thread): + """Manages the communication between this process and the worker processes. - The manager is run in a local thread. + The manager is run in a local thread. Args: - executor: A reference to the ProcessPoolExecutor that owns - this thread. A weakref will be own by the manager as well as - references to internal objects used to introspect the state of - the executor. + executor: A reference to the ProcessPoolExecutor that owns + this thread. A weakref will be own by the manager as well as + references to internal objects used to introspect the state of + the executor. """ - def __init__(self, executor): - # Store references to necessary internals of the executor. - - # A _ThreadWakeup to allow waking up the queue_manager_thread from the - # main Thread and avoid deadlocks caused by permanently locked queues. - self.thread_wakeup = executor._executor_manager_thread_wakeup - self.shutdown_lock = executor._shutdown_lock - - # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used - # to determine if the ProcessPoolExecutor has been garbage collected - # and that the manager can exit. - # When the executor gets garbage collected, the weakref callback - # will wake up the queue management thread so that it can terminate - # if there is no pending work item. - def weakref_cb(_, - thread_wakeup=self.thread_wakeup, - shutdown_lock=self.shutdown_lock): - mp.util.debug('Executor collected: triggering callback for' - ' QueueManager wakeup') - with shutdown_lock: - thread_wakeup.wakeup() - - self.executor_reference = weakref.ref(executor, weakref_cb) - - # A list of the ctx.Process instances used as workers. - self.processes = executor._processes - - # A ctx.Queue that will be filled with _CallItems derived from - # _WorkItems for processing by the process workers. - self.call_queue = executor._call_queue - - # A ctx.SimpleQueue of _ResultItems generated by the process workers. - self.result_queue = executor._result_queue - - # A queue.Queue of work ids e.g. Queue([5, 6, ...]). - self.work_ids_queue = executor._work_ids - - # A dict mapping work ids to _WorkItems e.g. - # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - self.pending_work_items = executor._pending_work_items - - super().__init__() - - def run(self): - # Main loop for the executor manager thread. - - while True: - self.add_call_item_to_queue() - - result_item, is_broken, cause = self.wait_result_broken_or_wakeup() - - if is_broken: - self.terminate_broken(cause) - return - if result_item is not None: - self.process_result_item(result_item) - # Delete reference to result_item to avoid keeping references - # while waiting on new results. - del result_item - - # attempt to increment idle process count - executor = self.executor_reference() - if executor is not None: - executor._idle_worker_semaphore.release() - del executor - - if self.is_shutting_down(): - self.flag_executor_shutting_down() - - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not self.pending_work_items: - self.join_executor_internals() - return - - def add_call_item_to_queue(self): - # Fills call_queue with _WorkItems from pending_work_items. - # This function never blocks. - while True: - if self.call_queue.full(): - return - try: - work_id = self.work_ids_queue.get(block=False) - except queue.Empty: - return - else: - work_item = self.pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - self.call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del self.pending_work_items[work_id] - continue - - def wait_result_broken_or_wakeup(self): + def __init__(self, executor): + # Store references to necessary internals of the executor. + + # A _ThreadWakeup to allow waking up the queue_manager_thread from the + # main Thread and avoid deadlocks caused by permanently locked queues. + self.thread_wakeup = executor._executor_manager_thread_wakeup + self.shutdown_lock = executor._shutdown_lock + + # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used + # to determine if the ProcessPoolExecutor has been garbage collected + # and that the manager can exit. + # When the executor gets garbage collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self.thread_wakeup, + shutdown_lock=self.shutdown_lock): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + with shutdown_lock: + thread_wakeup.wakeup() + + self.executor_reference = weakref.ref(executor, weakref_cb) + + # A list of the ctx.Process instances used as workers. + self.processes = executor._processes + + # A ctx.Queue that will be filled with _CallItems derived from + # _WorkItems for processing by the process workers. + self.call_queue = executor._call_queue + + # A ctx.SimpleQueue of _ResultItems generated by the process workers. + self.result_queue = executor._result_queue + + # A queue.Queue of work ids e.g. Queue([5, 6, ...]). + self.work_ids_queue = executor._work_ids + + # A dict mapping work ids to _WorkItems e.g. + # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + self.pending_work_items = executor._pending_work_items + + super().__init__() + + def run(self): + # Main loop for the executor manager thread. + + while True: + self.add_call_item_to_queue() + + result_item, is_broken, cause = self.wait_result_broken_or_wakeup() + + if is_broken: + self.terminate_broken(cause) + return + if result_item is not None: + self.process_result_item(result_item) + # Delete reference to result_item to avoid keeping references + # while waiting on new results. + del result_item + + # attempt to increment idle process count + executor = self.executor_reference() + if executor is not None: + executor._idle_worker_semaphore.release() + del executor + + if self.is_shutting_down(): + self.flag_executor_shutting_down() + + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not self.pending_work_items: + self.join_executor_internals() + return + + def add_call_item_to_queue(self): + # Fills call_queue with _WorkItems from pending_work_items. + # This function never blocks. + while True: + if self.call_queue.full(): + return + try: + work_id = self.work_ids_queue.get(block=False) + except queue.Empty: + return + else: + work_item = self.pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + self.call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del self.pending_work_items[work_id] + continue + + def wait_result_broken_or_wakeup(self): # Wait for a result to be ready in the result_queue while checking # that all worker processes are still running, or for a wake up # signal send. The wake up signals come either from new tasks being # submitted, from the executor being shutdown/gc-ed, or from the # shutdown of the python interpreter. - result_reader = self.result_queue._reader - assert not self.thread_wakeup._closed - wakeup_reader = self.thread_wakeup._reader - readers = [result_reader, wakeup_reader] - worker_sentinels = [p.sentinel for p in list(self.processes.values())] - ready = mp.connection.wait(readers + worker_sentinels) + result_reader = self.result_queue._reader + assert not self.thread_wakeup._closed + wakeup_reader = self.thread_wakeup._reader + readers = [result_reader, wakeup_reader] + worker_sentinels = [p.sentinel for p in list(self.processes.values())] + ready = mp.connection.wait(readers + worker_sentinels) cause = None is_broken = True - result_item = None + result_item = None if result_reader in ready: try: result_item = result_reader.recv() @@ -388,28 +388,28 @@ class _ExecutorManagerThread(threading.Thread): elif wakeup_reader in ready: is_broken = False - - with self.shutdown_lock: - self.thread_wakeup.clear() - - return result_item, is_broken, cause - - def process_result_item(self, result_item): - # Process the received a result_item. This can be either the PID of a - # worker that exited gracefully or a _ResultItem - + + with self.shutdown_lock: + self.thread_wakeup.clear() + + return result_item, is_broken, cause + + def process_result_item(self, result_item): + # Process the received a result_item. This can be either the PID of a + # worker that exited gracefully or a _ResultItem + if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert self.is_shutting_down() - p = self.processes.pop(result_item) + assert self.is_shutting_down() + p = self.processes.pop(result_item) p.join() - if not self.processes: - self.join_executor_internals() + if not self.processes: + self.join_executor_internals() return - else: - # Received a _ResultItem so mark the future as completed. - work_item = self.pending_work_items.pop(result_item.work_id, None) + else: + # Received a _ResultItem so mark the future as completed. + work_item = self.pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) if work_item is not None: if result_item.exception: @@ -417,111 +417,111 @@ class _ExecutorManagerThread(threading.Thread): else: work_item.future.set_result(result_item.result) - def is_shutting_down(self): - # Check whether we should start shutting down the executor. - executor = self.executor_reference() + def is_shutting_down(self): + # Check whether we should start shutting down the executor. + executor = self.executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - return (_global_shutdown or executor is None - or executor._shutdown_thread) - - def terminate_broken(self, cause): - # Terminate the executor because it is in a broken state. The cause - # argument can be used to display more information on the error that - # lead the executor into becoming broken. - - # Mark the process pool broken so that submits fail right now. - executor = self.executor_reference() - if executor is not None: - executor._broken = ('A child process terminated ' - 'abruptly, the process pool is not ' - 'usable anymore') - executor._shutdown_thread = True - executor = None - - # All pending tasks are to be marked failed with the following - # BrokenProcessPool error - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") - if cause is not None: - bpe.__cause__ = _RemoteTraceback( - f"\n'''\n{''.join(cause)}'''") - - # Mark pending tasks as failed. - for work_id, work_item in self.pending_work_items.items(): - work_item.future.set_exception(bpe) - # Delete references to object. See issue16284 - del work_item - self.pending_work_items.clear() - - # Terminate remaining workers forcibly: the queues or their - # locks may be in a dirty state and block forever. - for p in self.processes.values(): - p.terminate() - - # clean up resources - self.join_executor_internals() - - def flag_executor_shutting_down(self): - # Flag the executor as shutting down and cancel remaining tasks if - # requested as early as possible if it is not gc-ed yet. - executor = self.executor_reference() - if executor is not None: - executor._shutdown_thread = True - # Cancel pending work items if requested. - if executor._cancel_pending_futures: - # Cancel all pending futures and update pending_work_items - # to only have futures that are currently running. - new_pending_work_items = {} - for work_id, work_item in self.pending_work_items.items(): - if not work_item.future.cancel(): - new_pending_work_items[work_id] = work_item - self.pending_work_items = new_pending_work_items - # Drain work_ids_queue since we no longer need to - # add items to the call queue. - while True: - try: - self.work_ids_queue.get_nowait() - except queue.Empty: - break - # Make sure we do this only once to not waste time looping - # on running processes over and over. - executor._cancel_pending_futures = False - - def shutdown_workers(self): - n_children_to_stop = self.get_n_children_alive() - n_sentinels_sent = 0 - # Send the right number of sentinels, to make sure all children are - # properly terminated. - while (n_sentinels_sent < n_children_to_stop - and self.get_n_children_alive() > 0): - for i in range(n_children_to_stop - n_sentinels_sent): - try: - self.call_queue.put_nowait(None) - n_sentinels_sent += 1 - except queue.Full: - break - - def join_executor_internals(self): - self.shutdown_workers() - # Release the queue's resources as soon as possible. - self.call_queue.close() - self.call_queue.join_thread() - with self.shutdown_lock: - self.thread_wakeup.close() - # If .join() is not called on the created processes then - # some ctx.Queue methods may deadlock on Mac OS X. - for p in self.processes.values(): - p.join() - - def get_n_children_alive(self): - # This is an upper bound on the number of children alive. - return sum(p.is_alive() for p in self.processes.values()) - - + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + def terminate_broken(self, cause): + # Terminate the executor because it is in a broken state. The cause + # argument can be used to display more information on the error that + # lead the executor into becoming broken. + + # Mark the process pool broken so that submits fail right now. + executor = self.executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + + # All pending tasks are to be marked failed with the following + # BrokenProcessPool error + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + + # Mark pending tasks as failed. + for work_id, work_item in self.pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + self.pending_work_items.clear() + + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in self.processes.values(): + p.terminate() + + # clean up resources + self.join_executor_internals() + + def flag_executor_shutting_down(self): + # Flag the executor as shutting down and cancel remaining tasks if + # requested as early as possible if it is not gc-ed yet. + executor = self.executor_reference() + if executor is not None: + executor._shutdown_thread = True + # Cancel pending work items if requested. + if executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in self.pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + self.pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + self.work_ids_queue.get_nowait() + except queue.Empty: + break + # Make sure we do this only once to not waste time looping + # on running processes over and over. + executor._cancel_pending_futures = False + + def shutdown_workers(self): + n_children_to_stop = self.get_n_children_alive() + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while (n_sentinels_sent < n_children_to_stop + and self.get_n_children_alive() > 0): + for i in range(n_children_to_stop - n_sentinels_sent): + try: + self.call_queue.put_nowait(None) + n_sentinels_sent += 1 + except queue.Full: + break + + def join_executor_internals(self): + self.shutdown_workers() + # Release the queue's resources as soon as possible. + self.call_queue.close() + self.call_queue.join_thread() + with self.shutdown_lock: + self.thread_wakeup.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in self.processes.values(): + p.join() + + def get_n_children_alive(self): + # This is an upper bound on the number of children alive. + return sum(p.is_alive() for p in self.processes.values()) + + _system_limits_checked = False _system_limited = None @@ -580,23 +580,23 @@ class ProcessPoolExecutor(_base.Executor): worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. - initializer: A callable used to initialize worker processes. + initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. """ _check_system_limits() if max_workers is None: self._max_workers = os.cpu_count() or 1 - if sys.platform == 'win32': - self._max_workers = min(_MAX_WINDOWS_WORKERS, - self._max_workers) + if sys.platform == 'win32': + self._max_workers = min(_MAX_WINDOWS_WORKERS, + self._max_workers) else: if max_workers <= 0: raise ValueError("max_workers must be greater than 0") - elif (sys.platform == 'win32' and - max_workers > _MAX_WINDOWS_WORKERS): - raise ValueError( - f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") + elif (sys.platform == 'win32' and + max_workers > _MAX_WINDOWS_WORKERS): + raise ValueError( + f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") self._max_workers = max_workers @@ -610,7 +610,7 @@ class ProcessPoolExecutor(_base.Executor): self._initargs = initargs # Management thread - self._executor_manager_thread = None + self._executor_manager_thread = None # Map of pids to processes self._processes = {} @@ -618,22 +618,22 @@ class ProcessPoolExecutor(_base.Executor): # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() - self._idle_worker_semaphore = threading.Semaphore(0) + self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} - self._cancel_pending_futures = False - - # _ThreadWakeup is a communication channel used to interrupt the wait - # of the main loop of executor_manager_thread from another thread (e.g. - # when calling executor.submit or executor.shutdown). We do not use the - # _result_queue to send wakeup signals to the executor_manager_thread - # as it could result in a deadlock if a worker process dies with the - # _result_queue write lock still acquired. - # - # _shutdown_lock must be locked to access _ThreadWakeup. - self._executor_manager_thread_wakeup = _ThreadWakeup() - + self._cancel_pending_futures = False + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of executor_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send wakeup signals to the executor_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + # + # _shutdown_lock must be locked to access _ThreadWakeup. + self._executor_manager_thread_wakeup = _ThreadWakeup() + # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big @@ -641,9 +641,9 @@ class ProcessPoolExecutor(_base.Executor): queue_size = self._max_workers + EXTRA_QUEUED_CALLS self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, - pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, - thread_wakeup=self._executor_manager_thread_wakeup) + pending_work_items=self._pending_work_items, + shutdown_lock=self._shutdown_lock, + thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. @@ -651,21 +651,21 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - def _start_executor_manager_thread(self): - if self._executor_manager_thread is None: + def _start_executor_manager_thread(self): + if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. - self._executor_manager_thread = _ExecutorManagerThread(self) - self._executor_manager_thread.start() - _threads_wakeups[self._executor_manager_thread] = \ - self._executor_manager_thread_wakeup + self._executor_manager_thread = _ExecutorManagerThread(self) + self._executor_manager_thread.start() + _threads_wakeups[self._executor_manager_thread] = \ + self._executor_manager_thread_wakeup def _adjust_process_count(self): - # if there's an idle process, we don't need to spawn a new one. - if self._idle_worker_semaphore.acquire(blocking=False): - return - - process_count = len(self._processes) - if process_count < self._max_workers: + # if there's an idle process, we don't need to spawn a new one. + if self._idle_worker_semaphore.acquire(blocking=False): + return + + process_count = len(self._processes) + if process_count < self._max_workers: p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, @@ -675,7 +675,7 @@ class ProcessPoolExecutor(_base.Executor): p.start() self._processes[p.pid] = p - def submit(self, fn, /, *args, **kwargs): + def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) @@ -692,10 +692,10 @@ class ProcessPoolExecutor(_base.Executor): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._executor_manager_thread_wakeup.wakeup() + self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() - self._start_executor_manager_thread() + self._adjust_process_count() + self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ @@ -728,24 +728,24 @@ class ProcessPoolExecutor(_base.Executor): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: - self._cancel_pending_futures = cancel_futures + self._cancel_pending_futures = cancel_futures self._shutdown_thread = True - if self._executor_manager_thread_wakeup is not None: - # Wake up queue management thread - self._executor_manager_thread_wakeup.wakeup() - - if self._executor_manager_thread is not None and wait: - self._executor_manager_thread.join() + if self._executor_manager_thread_wakeup is not None: + # Wake up queue management thread + self._executor_manager_thread_wakeup.wakeup() + + if self._executor_manager_thread is not None and wait: + self._executor_manager_thread.join() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. - self._executor_manager_thread = None - self._call_queue = None - if self._result_queue is not None and wait: - self._result_queue.close() + self._executor_manager_thread = None + self._call_queue = None + if self._result_queue is not None and wait: + self._result_queue.close() self._result_queue = None self._processes = None - self._executor_manager_thread_wakeup = None + self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py index 51c942f51a..0ed4a2c091 100644 --- a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py +++ b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py @@ -9,40 +9,40 @@ from concurrent.futures import _base import itertools import queue import threading -import types +import types import weakref import os _threads_queues = weakref.WeakKeyDictionary() _shutdown = False -# Lock that ensures that new workers are not created while the interpreter is -# shutting down. Must be held while mutating _threads_queues and _shutdown. -_global_shutdown_lock = threading.Lock() +# Lock that ensures that new workers are not created while the interpreter is +# shutting down. Must be held while mutating _threads_queues and _shutdown. +_global_shutdown_lock = threading.Lock() def _python_exit(): global _shutdown - with _global_shutdown_lock: - _shutdown = True + with _global_shutdown_lock: + _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() -# Register for `_python_exit()` to be called just before joining all -# non-daemon threads. This is used instead of `atexit.register()` for -# compatibility with subinterpreters, which no longer support daemon threads. -# See bpo-39812 for context. -threading._register_atexit(_python_exit) - -# At fork, reinitialize the `_global_shutdown_lock` lock in the child process -if hasattr(os, 'register_at_fork'): - os.register_at_fork(before=_global_shutdown_lock.acquire, - after_in_child=_global_shutdown_lock._at_fork_reinit, - after_in_parent=_global_shutdown_lock.release) +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) +# At fork, reinitialize the `_global_shutdown_lock` lock in the child process +if hasattr(os, 'register_at_fork'): + os.register_at_fork(before=_global_shutdown_lock.acquire, + after_in_child=_global_shutdown_lock._at_fork_reinit, + after_in_parent=_global_shutdown_lock.release) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -63,9 +63,9 @@ class _WorkItem(object): else: self.future.set_result(result) - __class_getitem__ = classmethod(types.GenericAlias) - + __class_getitem__ = classmethod(types.GenericAlias) + def _worker(executor_reference, work_queue, initializer, initargs): if initializer is not None: try: @@ -83,14 +83,14 @@ def _worker(executor_reference, work_queue, initializer, initargs): work_item.run() # Delete references to object. See issue16284 del work_item - - # attempt to increment idle count - executor = executor_reference() - if executor is not None: - executor._idle_semaphore.release() - del executor + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor continue - + executor = executor_reference() # Exit if: # - The interpreter is shutting down OR @@ -128,18 +128,18 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. - initializer: A callable used to initialize worker threads. + initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: - # ThreadPoolExecutor is often used to: - # * CPU bound task which releases GIL - # * I/O bound task (which releases GIL, of course) - # - # We use cpu_count + 4 for both types of tasks. - # But we limit it to 32 to avoid consuming surprisingly large resource - # on many core machine. - max_workers = min(32, (os.cpu_count() or 1) + 4) + # ThreadPoolExecutor is often used to: + # * CPU bound task which releases GIL + # * I/O bound task (which releases GIL, of course) + # + # We use cpu_count + 4 for both types of tasks. + # But we limit it to 32 to avoid consuming surprisingly large resource + # on many core machine. + max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") @@ -148,7 +148,7 @@ class ThreadPoolExecutor(_base.Executor): self._max_workers = max_workers self._work_queue = queue.SimpleQueue() - self._idle_semaphore = threading.Semaphore(0) + self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False @@ -158,8 +158,8 @@ class ThreadPoolExecutor(_base.Executor): self._initializer = initializer self._initargs = initargs - def submit(self, fn, /, *args, **kwargs): - with self._shutdown_lock, _global_shutdown_lock: + def submit(self, fn, /, *args, **kwargs): + with self._shutdown_lock, _global_shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) @@ -178,15 +178,15 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): - # if idle threads are available, don't spin new threads - if self._idle_semaphore.acquire(timeout=0): - return - + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - + num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, @@ -213,22 +213,22 @@ class ThreadPoolExecutor(_base.Executor): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True - if cancel_futures: - # Drain all work items from the queue, and then cancel their - # associated futures. - while True: - try: - work_item = self._work_queue.get_nowait() - except queue.Empty: - break - if work_item is not None: - work_item.future.cancel() - - # Send a wake-up to prevent threads calling - # _work_queue.get(block=True) from permanently blocking. + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: |