diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/tools/python3/src/Lib/concurrent | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/tools/python3/src/Lib/concurrent')
5 files changed, 1789 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Lib/concurrent/__init__.py b/contrib/tools/python3/src/Lib/concurrent/__init__.py new file mode 100644 index 0000000000..196d378857 --- /dev/null +++ b/contrib/tools/python3/src/Lib/concurrent/__init__.py @@ -0,0 +1 @@ +# This directory is a Python package. diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py new file mode 100644 index 0000000000..292e886d5a --- /dev/null +++ b/contrib/tools/python3/src/Lib/concurrent/futures/__init__.py @@ -0,0 +1,53 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + InvalidStateError, + BrokenExecutor, + Future, + Executor, + wait, + as_completed) + +__all__ = ( + 'FIRST_COMPLETED', + 'FIRST_EXCEPTION', + 'ALL_COMPLETED', + 'CancelledError', + 'TimeoutError', + 'BrokenExecutor', + 'Future', + 'Executor', + 'wait', + 'as_completed', + 'ProcessPoolExecutor', + 'ThreadPoolExecutor', +) + + +def __dir__(): + return __all__ + ('__author__', '__doc__') + + +def __getattr__(name): + global ProcessPoolExecutor, ThreadPoolExecutor + + if name == 'ProcessPoolExecutor': + from .process import ProcessPoolExecutor as pe + ProcessPoolExecutor = pe + return pe + + if name == 'ThreadPoolExecutor': + from .thread import ThreadPoolExecutor as te + ThreadPoolExecutor = te + return te + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/_base.py b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py new file mode 100644 index 0000000000..6742a07753 --- /dev/null +++ b/contrib/tools/python3/src/Lib/concurrent/futures/_base.py @@ -0,0 +1,654 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +import collections +import logging +import threading +import time +import types + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +TimeoutError = TimeoutError # make local alias for the standard exception + +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super().add_result(future) + self.event.set() + + def add_exception(self, future): + super().add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super().add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() + super().__init__() + + def _decrement_pending_calls(self): + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super().add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super().add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super().add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). If any given Futures are duplicated, they will be returned + once. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = set(fs) + total_futures = len(fs) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = fs - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + finished = list(finished) + try: + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs,)) + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.monotonic() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), total_futures)) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + # reverse to keep finishing order + finished.reverse() + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)) + + finally: + # Remove waiter from unfinished futures + for f in fs: + with f._condition: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. Duplicate futures given to *fs* are removed and will be + returned only once. + """ + fs = set(fs) + with _AcquireFutures(fs): + done = {f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} + not_done = fs - done + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + with f._condition: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, fs - done) + + +def _result_or_cancel(fut, timeout=None): + try: + try: + return fut.result(timeout) + finally: + fut.cancel() + finally: + # Break a reference cycle with the exception in self._exception + del fut + + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<%s at %#x state=%s raised %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<%s at %#x state=%s returned %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<%s at %#x state=%s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future was cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True if the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + try: + raise self._exception + finally: + # Break a reference cycle with the exception in self._exception + self = None + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + try: + fn(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + try: + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + finally: + # Break a reference cycle with the exception in self._exception + self = None + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self), + self._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + + __class_getitem__ = classmethod(types.GenericAlias) + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(self, fn, /, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + raise NotImplementedError() + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: The size of the chunks the iterable will be broken into + before being passed to a child process. This argument is only + used by ProcessPoolExecutor; it is ignored by + ThreadPoolExecutor. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + yield _result_or_cancel(fs.pop()) + else: + yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + finally: + for future in fs: + future.cancel() + return result_iterator() + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False + + +class BrokenExecutor(RuntimeError): + """ + Raised when a executor has become non-functional after a severe failure. + """ diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/process.py b/contrib/tools/python3/src/Lib/concurrent/futures/process.py new file mode 100644 index 0000000000..321608ab17 --- /dev/null +++ b/contrib/tools/python3/src/Lib/concurrent/futures/process.py @@ -0,0 +1,845 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The following diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Result Q" +""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +import os +from concurrent.futures import _base +import queue +import multiprocessing as mp +import multiprocessing.connection +from multiprocessing.queues import Queue +import threading +import weakref +from functools import partial +import itertools +import sys +from traceback import format_exception + + +_threads_wakeups = weakref.WeakKeyDictionary() +_global_shutdown = False + + +class _ThreadWakeup: + def __init__(self): + self._closed = False + self._reader, self._writer = mp.Pipe(duplex=False) + + def close(self): + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() + + def wakeup(self): + if not self._closed: + self._writer.send_bytes(b"") + + def clear(self): + if not self._closed: + while self._reader.poll(): + self._reader.recv_bytes() + + +def _python_exit(): + global _global_shutdown + _global_shutdown = True + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + # call not protected by ProcessPoolExecutor._shutdown_lock + thread_wakeup.wakeup() + for t, _ in items: + t.join() + +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + + +# On Windows, WaitForMultipleObjects is used to wait for processes to finish. +# It can wait on, at most, 63 objects. There is an overhead of two objects: +# - the result queue reader +# - the thread wakeup reader +_MAX_WINDOWS_WORKERS = 63 - 2 + +# Hack to embed stringification of remote traceback in local traceback + +class _RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class _ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = ''.join(format_exception(type(exc), exc, tb)) + self.exc = exc + # Traceback object needs to be garbage-collected as its frames + # contain references to all the objects in the exception scope + self.exc.__traceback__ = None + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return _rebuild_exc, (self.exc, self.tb) + +def _rebuild_exc(exc, tb): + exc.__cause__ = _RemoteTraceback(tb) + return exc + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None, exit_pid=None): + self.work_id = work_id + self.exception = exception + self.result = result + self.exit_pid = exit_pid + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, + thread_wakeup): + self.pending_work_items = pending_work_items + self.shutdown_lock = shutdown_lock + self.thread_wakeup = thread_wakeup + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + with self.shutdown_lock: + self.thread_wakeup.wakeup() + # work_item can be None if another process terminated. In this + # case, the executor_manager_thread fails all work_items + # with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + + +def _sendback_result(result_queue, work_id, result=None, exception=None, + exit_pid=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception, exit_pid=exit_pid)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc, + exit_pid=exit_pid)) + + +def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A ctx.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A ctx.Queue of _ResultItems that will written + to by the worker. + initializer: A callable initializer, or None + initargs: A tuple of args for the initializer + """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + # The parent will notice that the process stopped and + # mark the pool broken + return + num_tasks = 0 + exit_pid = None + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(os.getpid()) + return + + if max_tasks is not None: + num_tasks += 1 + if num_tasks >= max_tasks: + exit_pid = os.getpid() + + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + _sendback_result(result_queue, call_item.work_id, exception=exc, + exit_pid=exit_pid) + else: + _sendback_result(result_queue, call_item.work_id, result=r, + exit_pid=exit_pid) + del r + + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + + if exit_pid is not None: + return + + +class _ExecutorManagerThread(threading.Thread): + """Manages the communication between this process and the worker processes. + + The manager is run in a local thread. + + Args: + executor: A reference to the ProcessPoolExecutor that owns + this thread. A weakref will be own by the manager as well as + references to internal objects used to introspect the state of + the executor. + """ + + def __init__(self, executor): + # Store references to necessary internals of the executor. + + # A _ThreadWakeup to allow waking up the queue_manager_thread from the + # main Thread and avoid deadlocks caused by permanently locked queues. + self.thread_wakeup = executor._executor_manager_thread_wakeup + self.shutdown_lock = executor._shutdown_lock + + # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used + # to determine if the ProcessPoolExecutor has been garbage collected + # and that the manager can exit. + # When the executor gets garbage collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self.thread_wakeup, + shutdown_lock=self.shutdown_lock): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + with shutdown_lock: + thread_wakeup.wakeup() + + self.executor_reference = weakref.ref(executor, weakref_cb) + + # A list of the ctx.Process instances used as workers. + self.processes = executor._processes + + # A ctx.Queue that will be filled with _CallItems derived from + # _WorkItems for processing by the process workers. + self.call_queue = executor._call_queue + + # A ctx.SimpleQueue of _ResultItems generated by the process workers. + self.result_queue = executor._result_queue + + # A queue.Queue of work ids e.g. Queue([5, 6, ...]). + self.work_ids_queue = executor._work_ids + + # Maximum number of tasks a worker process can execute before + # exiting safely + self.max_tasks_per_child = executor._max_tasks_per_child + + # A dict mapping work ids to _WorkItems e.g. + # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + self.pending_work_items = executor._pending_work_items + + super().__init__() + + def run(self): + # Main loop for the executor manager thread. + + while True: + self.add_call_item_to_queue() + + result_item, is_broken, cause = self.wait_result_broken_or_wakeup() + + if is_broken: + self.terminate_broken(cause) + return + if result_item is not None: + self.process_result_item(result_item) + + process_exited = result_item.exit_pid is not None + if process_exited: + p = self.processes.pop(result_item.exit_pid) + p.join() + + # Delete reference to result_item to avoid keeping references + # while waiting on new results. + del result_item + + if executor := self.executor_reference(): + if process_exited: + with self.shutdown_lock: + executor._adjust_process_count() + else: + executor._idle_worker_semaphore.release() + del executor + + if self.is_shutting_down(): + self.flag_executor_shutting_down() + + # When only canceled futures remain in pending_work_items, our + # next call to wait_result_broken_or_wakeup would hang forever. + # This makes sure we have some running futures or none at all. + self.add_call_item_to_queue() + + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not self.pending_work_items: + self.join_executor_internals() + return + + def add_call_item_to_queue(self): + # Fills call_queue with _WorkItems from pending_work_items. + # This function never blocks. + while True: + if self.call_queue.full(): + return + try: + work_id = self.work_ids_queue.get(block=False) + except queue.Empty: + return + else: + work_item = self.pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + self.call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del self.pending_work_items[work_id] + continue + + def wait_result_broken_or_wakeup(self): + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + result_reader = self.result_queue._reader + assert not self.thread_wakeup._closed + wakeup_reader = self.thread_wakeup._reader + readers = [result_reader, wakeup_reader] + worker_sentinels = [p.sentinel for p in list(self.processes.values())] + ready = mp.connection.wait(readers + worker_sentinels) + + cause = None + is_broken = True + result_item = None + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + + with self.shutdown_lock: + self.thread_wakeup.clear() + + return result_item, is_broken, cause + + def process_result_item(self, result_item): + # Process the received a result_item. This can be either the PID of a + # worker that exited gracefully or a _ResultItem + + if isinstance(result_item, int): + # Clean shutdown of a worker using its PID + # (avoids marking the executor broken) + assert self.is_shutting_down() + p = self.processes.pop(result_item) + p.join() + if not self.processes: + self.join_executor_internals() + return + else: + # Received a _ResultItem so mark the future as completed. + work_item = self.pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + + def is_shutting_down(self): + # Check whether we should start shutting down the executor. + executor = self.executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + def terminate_broken(self, cause): + # Terminate the executor because it is in a broken state. The cause + # argument can be used to display more information on the error that + # lead the executor into becoming broken. + + # Mark the process pool broken so that submits fail right now. + executor = self.executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + + # All pending tasks are to be marked failed with the following + # BrokenProcessPool error + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + + # Mark pending tasks as failed. + for work_id, work_item in self.pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + self.pending_work_items.clear() + + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in self.processes.values(): + p.terminate() + + # Prevent queue writing to a pipe which is no longer read. + # https://github.com/python/cpython/issues/94777 + self.call_queue._reader.close() + + # clean up resources + self.join_executor_internals() + + def flag_executor_shutting_down(self): + # Flag the executor as shutting down and cancel remaining tasks if + # requested as early as possible if it is not gc-ed yet. + executor = self.executor_reference() + if executor is not None: + executor._shutdown_thread = True + # Cancel pending work items if requested. + if executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in self.pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + self.pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + self.work_ids_queue.get_nowait() + except queue.Empty: + break + # Make sure we do this only once to not waste time looping + # on running processes over and over. + executor._cancel_pending_futures = False + + def shutdown_workers(self): + n_children_to_stop = self.get_n_children_alive() + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while (n_sentinels_sent < n_children_to_stop + and self.get_n_children_alive() > 0): + for i in range(n_children_to_stop - n_sentinels_sent): + try: + self.call_queue.put_nowait(None) + n_sentinels_sent += 1 + except queue.Full: + break + + def join_executor_internals(self): + self.shutdown_workers() + # Release the queue's resources as soon as possible. + self.call_queue.close() + self.call_queue.join_thread() + with self.shutdown_lock: + self.thread_wakeup.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in self.processes.values(): + p.join() + + def get_n_children_alive(self): + # This is an upper bound on the number of children alive. + return sum(p.is_alive() for p in self.processes.values()) + + +_system_limits_checked = False +_system_limited = None + + +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import multiprocessing.synchronize + except ImportError: + _system_limited = ( + "This Python build lacks multiprocessing.synchronize, usually due " + "to named semaphores being unavailable on this platform." + ) + raise NotImplementedError(_system_limited) + try: + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermined limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) + raise NotImplementedError(_system_limited) + + +def _chain_from_iterable_of_lists(iterable): + """ + Specialized implementation of itertools.chain.from_iterable. + Each item in *iterable* should be a list. This function is + careful not to keep references to yielded objects. + """ + for element in iterable: + element.reverse() + while element: + yield element.pop() + + +class BrokenProcessPool(_base.BrokenExecutor): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None, mp_context=None, + initializer=None, initargs=(), *, max_tasks_per_child=None): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + mp_context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. Useful + to allow specific multiprocessing start methods. + initializer: A callable used to initialize worker processes. + initargs: A tuple of arguments to pass to the initializer. + max_tasks_per_child: The maximum number of tasks a worker process + can complete before it will exit and be replaced with a fresh + worker process. The default of None means worker process will + live as long as the executor. Requires a non-'fork' mp_context + start method. When given, we default to using 'spawn' if no + mp_context is supplied. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = os.cpu_count() or 1 + if sys.platform == 'win32': + self._max_workers = min(_MAX_WINDOWS_WORKERS, + self._max_workers) + else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + elif (sys.platform == 'win32' and + max_workers > _MAX_WINDOWS_WORKERS): + raise ValueError( + f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") + + self._max_workers = max_workers + + if mp_context is None: + if max_tasks_per_child is not None: + mp_context = mp.get_context("spawn") + else: + mp_context = mp.get_context() + self._mp_context = mp_context + + # https://github.com/python/cpython/issues/90622 + self._safe_to_dynamically_spawn_children = ( + self._mp_context.get_start_method(allow_none=False) != "fork") + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._initializer = initializer + self._initargs = initargs + + if max_tasks_per_child is not None: + if not isinstance(max_tasks_per_child, int): + raise TypeError("max_tasks_per_child must be an integer") + elif max_tasks_per_child <= 0: + raise ValueError("max_tasks_per_child must be >= 1") + if self._mp_context.get_start_method(allow_none=False) == "fork": + # https://github.com/python/cpython/issues/90622 + raise ValueError("max_tasks_per_child is incompatible with" + " the 'fork' multiprocessing start method;" + " supply a different mp_context.") + self._max_tasks_per_child = max_tasks_per_child + + # Management thread + self._executor_manager_thread = None + + # Map of pids to processes + self._processes = {} + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._idle_worker_semaphore = threading.Semaphore(0) + self._broken = False + self._queue_count = 0 + self._pending_work_items = {} + self._cancel_pending_futures = False + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of executor_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send wakeup signals to the executor_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + # + # _shutdown_lock must be locked to access _ThreadWakeup. + self._executor_manager_thread_wakeup = _ThreadWakeup() + + # Create communication channels for the executor + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items, + shutdown_lock=self._shutdown_lock, + thread_wakeup=self._executor_manager_thread_wakeup) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = mp_context.SimpleQueue() + self._work_ids = queue.Queue() + + def _start_executor_manager_thread(self): + if self._executor_manager_thread is None: + # Start the processes so that their sentinels are known. + if not self._safe_to_dynamically_spawn_children: # ie, using fork. + self._launch_processes() + self._executor_manager_thread = _ExecutorManagerThread(self) + self._executor_manager_thread.start() + _threads_wakeups[self._executor_manager_thread] = \ + self._executor_manager_thread_wakeup + + def _adjust_process_count(self): + # if there's an idle process, we don't need to spawn a new one. + if self._idle_worker_semaphore.acquire(blocking=False): + return + + process_count = len(self._processes) + if process_count < self._max_workers: + # Assertion disabled as this codepath is also used to replace a + # worker that unexpectedly dies, even when using the 'fork' start + # method. That means there is still a potential deadlock bug. If a + # 'fork' mp_context worker dies, we'll be forking a new one when + # we know a thread is running (self._executor_manager_thread). + #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self._spawn_process() + + def _launch_processes(self): + # https://github.com/python/cpython/issues/90622 + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result.') + for _ in range(len(self._processes), self._max_workers): + self._spawn_process() + + def _spawn_process(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs, + self._max_tasks_per_child)) + p.start() + self._processes[p.pid] = p + + def submit(self, fn, /, *args, **kwargs): + with self._shutdown_lock: + if self._broken: + raise BrokenProcessPool(self._broken) + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._executor_manager_thread_wakeup.wakeup() + + if self._safe_to_dynamically_spawn_children: + self._adjust_process_count() + self._start_executor_manager_thread() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return _chain_from_iterable_of_lists(results) + + def shutdown(self, wait=True, *, cancel_futures=False): + with self._shutdown_lock: + self._cancel_pending_futures = cancel_futures + self._shutdown_thread = True + if self._executor_manager_thread_wakeup is not None: + # Wake up queue management thread + self._executor_manager_thread_wakeup.wakeup() + + if self._executor_manager_thread is not None and wait: + self._executor_manager_thread.join() + # To reduce the risk of opening too many files, remove references to + # objects that use file descriptors. + self._executor_manager_thread = None + self._call_queue = None + if self._result_queue is not None and wait: + self._result_queue.close() + self._result_queue = None + self._processes = None + self._executor_manager_thread_wakeup = None + + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/contrib/tools/python3/src/Lib/concurrent/futures/thread.py b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py new file mode 100644 index 0000000000..51c942f51a --- /dev/null +++ b/contrib/tools/python3/src/Lib/concurrent/futures/thread.py @@ -0,0 +1,236 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures import _base +import itertools +import queue +import threading +import types +import weakref +import os + + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False +# Lock that ensures that new workers are not created while the interpreter is +# shutting down. Must be held while mutating _threads_queues and _shutdown. +_global_shutdown_lock = threading.Lock() + +def _python_exit(): + global _shutdown + with _global_shutdown_lock: + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + +# At fork, reinitialize the `_global_shutdown_lock` lock in the child process +if hasattr(os, 'register_at_fork'): + os.register_at_fork(before=_global_shutdown_lock.acquire, + after_in_child=_global_shutdown_lock._at_fork_reinit, + after_in_parent=_global_shutdown_lock.release) + + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None + else: + self.future.set_result(result) + + __class_getitem__ = classmethod(types.GenericAlias) + + +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + # Delete references to object. See issue16284 + del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor + continue + + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True + # Notice other workers + work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + + +class BrokenThreadPool(_base.BrokenExecutor): + """ + Raised when a worker thread in a ThreadPoolExecutor failed initializing. + """ + + +class ThreadPoolExecutor(_base.Executor): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().__next__ + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=()): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. + """ + if max_workers is None: + # ThreadPoolExecutor is often used to: + # * CPU bound task which releases GIL + # * I/O bound task (which releases GIL, of course) + # + # We use cpu_count + 4 for both types of tasks. + # But we limit it to 32 to avoid consuming surprisingly large resource + # on many core machine. + max_workers = min(32, (os.cpu_count() or 1) + 4) + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + + self._max_workers = max_workers + self._work_queue = queue.SimpleQueue() + self._idle_semaphore = threading.Semaphore(0) + self._threads = set() + self._broken = False + self._shutdown = False + self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs + + def submit(self, fn, /, *args, **kwargs): + with self._shutdown_lock, _global_shutdown_lock: + if self._broken: + raise BrokenThreadPool(self._broken) + + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs)) + t.start() + self._threads.add(t) + _threads_queues[t] = self._work_queue + + def _initializer_failed(self): + with self._shutdown_lock: + self._broken = ('A thread initializer failed, the thread pool ' + 'is not usable anymore') + # Drain work queue and mark pending futures failed + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception(BrokenThreadPool(self._broken)) + + def shutdown(self, wait=True, *, cancel_futures=False): + with self._shutdown_lock: + self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ |