diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/_threads | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/_threads')
7 files changed, 624 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/_threads/__init__.py b/contrib/python/Twisted/py3/twisted/_threads/__init__.py new file mode 100644 index 0000000000..13c7c7dda5 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/__init__.py @@ -0,0 +1,24 @@ +# -*- test-case-name: twisted.test.test_paths -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Twisted integration with operating system threads. +""" + + +from ._ithreads import AlreadyQuit, IWorker +from ._memory import createMemoryWorker +from ._pool import pool +from ._team import Team +from ._threadworker import LockWorker, ThreadWorker + +__all__ = [ + "ThreadWorker", + "LockWorker", + "IWorker", + "AlreadyQuit", + "Team", + "createMemoryWorker", + "pool", +] diff --git a/contrib/python/Twisted/py3/twisted/_threads/_convenience.py b/contrib/python/Twisted/py3/twisted/_threads/_convenience.py new file mode 100644 index 0000000000..deff576462 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_convenience.py @@ -0,0 +1,43 @@ +# -*- test-case-name: twisted._threads.test.test_convenience -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Common functionality used within the implementation of various workers. +""" + + +from ._ithreads import AlreadyQuit + + +class Quit: + """ + A flag representing whether a worker has been quit. + + @ivar isSet: Whether this flag is set. + @type isSet: L{bool} + """ + + def __init__(self): + """ + Create a L{Quit} un-set. + """ + self.isSet = False + + def set(self): + """ + Set the flag if it has not been set. + + @raise AlreadyQuit: If it has been set. + """ + self.check() + self.isSet = True + + def check(self): + """ + Check if the flag has been set. + + @raise AlreadyQuit: If it has been set. + """ + if self.isSet: + raise AlreadyQuit() diff --git a/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py b/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py new file mode 100644 index 0000000000..cab9135f87 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py @@ -0,0 +1,61 @@ +# -*- test-case-name: twisted._threads.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Interfaces related to threads. +""" + + +from typing import Callable + +from zope.interface import Interface + + +class AlreadyQuit(Exception): + """ + This worker worker is dead and cannot execute more instructions. + """ + + +class IWorker(Interface): + """ + A worker that can perform some work concurrently. + + All methods on this interface must be thread-safe. + """ + + def do(task: Callable[[], None]) -> None: + """ + Perform the given task. + + As an interface, this method makes no specific claims about concurrent + execution. An L{IWorker}'s C{do} implementation may defer execution + for later on the same thread, immediately on a different thread, or + some combination of the two. It is valid for a C{do} method to + schedule C{task} in such a way that it may never be executed. + + It is important for some implementations to provide specific properties + with respect to where C{task} is executed, of course, and client code + may rely on a more specific implementation of C{do} than L{IWorker}. + + @param task: a task to call in a thread or other concurrent context. + @type task: 0-argument callable + + @raise AlreadyQuit: if C{quit} has been called. + """ + + def quit(): + """ + Free any resources associated with this L{IWorker} and cause it to + reject all future work. + + @raise AlreadyQuit: if this method has already been called. + """ + + +class IExclusiveWorker(IWorker): + """ + Like L{IWorker}, but with the additional guarantee that the callables + passed to C{do} will not be called exclusively with each other. + """ diff --git a/contrib/python/Twisted/py3/twisted/_threads/_memory.py b/contrib/python/Twisted/py3/twisted/_threads/_memory.py new file mode 100644 index 0000000000..4c56db02ae --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_memory.py @@ -0,0 +1,70 @@ +# -*- test-case-name: twisted._threads.test.test_memory -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of an in-memory worker that defers execution. +""" + + +from zope.interface import implementer + +from . import IWorker +from ._convenience import Quit + +NoMoreWork = object() + + +@implementer(IWorker) +class MemoryWorker: + """ + An L{IWorker} that queues work for later performance. + + @ivar _quit: a flag indicating + @type _quit: L{Quit} + """ + + def __init__(self, pending=list): + """ + Create a L{MemoryWorker}. + """ + self._quit = Quit() + self._pending = pending() + + def do(self, work): + """ + Queue some work for to perform later; see L{createMemoryWorker}. + + @param work: The work to perform. + """ + self._quit.check() + self._pending.append(work) + + def quit(self): + """ + Quit this worker. + """ + self._quit.set() + self._pending.append(NoMoreWork) + + +def createMemoryWorker(): + """ + Create an L{IWorker} that does nothing but defer work, to be performed + later. + + @return: a worker that will enqueue work to perform later, and a callable + that will perform one element of that work. + @rtype: 2-L{tuple} of (L{IWorker}, L{callable}) + """ + + def perform(): + if not worker._pending: + return False + if worker._pending[0] is NoMoreWork: + return False + worker._pending.pop(0)() + return True + + worker = MemoryWorker() + return (worker, perform) diff --git a/contrib/python/Twisted/py3/twisted/_threads/_pool.py b/contrib/python/Twisted/py3/twisted/_threads/_pool.py new file mode 100644 index 0000000000..99c055d240 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_pool.py @@ -0,0 +1,73 @@ +# -*- test-case-name: twisted._threads.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Top level thread pool interface, used to implement +L{twisted.python.threadpool}. +""" + + +from queue import Queue +from threading import Lock, Thread, local as LocalStorage +from typing import Callable, Optional + +from typing_extensions import Protocol + +from twisted.python.log import err +from ._ithreads import IWorker +from ._team import Team +from ._threadworker import LockWorker, ThreadWorker + + +class _ThreadFactory(Protocol): + def __call__(self, *, target: Callable[..., object]) -> Thread: + ... + + +def pool( + currentLimit: Callable[[], int], threadFactory: _ThreadFactory = Thread +) -> Team: + """ + Construct a L{Team} that spawns threads as a thread pool, with the given + limiting function. + + @note: Future maintainers: while the public API for the eventual move to + twisted.threads should look I{something} like this, and while this + function is necessary to implement the API described by + L{twisted.python.threadpool}, I am starting to think the idea of a hard + upper limit on threadpool size is just bad (turning memory performance + issues into correctness issues well before we run into memory + pressure), and instead we should build something with reactor + integration for slowly releasing idle threads when they're not needed + and I{rate} limiting the creation of new threads rather than just + hard-capping it. + + @param currentLimit: a callable that returns the current limit on the + number of workers that the returned L{Team} should create; if it + already has more workers than that value, no new workers will be + created. + @type currentLimit: 0-argument callable returning L{int} + + @param threadFactory: Factory that, when given a C{target} keyword argument, + returns a L{threading.Thread} that will run that target. + @type threadFactory: callable returning a L{threading.Thread} + + @return: a new L{Team}. + """ + + def startThread(target: Callable[..., object]) -> None: + return threadFactory(target=target).start() + + def limitedWorkerCreator() -> Optional[IWorker]: + stats = team.statistics() + if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit(): + return None + return ThreadWorker(startThread, Queue()) + + team = Team( + coordinator=LockWorker(Lock(), LocalStorage()), + createWorker=limitedWorkerCreator, + logException=err, + ) + return team diff --git a/contrib/python/Twisted/py3/twisted/_threads/_team.py b/contrib/python/Twisted/py3/twisted/_threads/_team.py new file mode 100644 index 0000000000..d15ae04242 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_team.py @@ -0,0 +1,232 @@ +# -*- test-case-name: twisted._threads.test.test_team -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of a L{Team} of workers; a thread-pool that can allocate work to +workers. +""" +from __future__ import annotations + +from collections import deque +from typing import Callable, Optional, Set + +from zope.interface import implementer + +from . import IWorker +from ._convenience import Quit +from ._ithreads import IExclusiveWorker + + +class Statistics: + """ + Statistics about a L{Team}'s current activity. + + @ivar idleWorkerCount: The number of idle workers. + @type idleWorkerCount: L{int} + + @ivar busyWorkerCount: The number of busy workers. + @type busyWorkerCount: L{int} + + @ivar backloggedWorkCount: The number of work items passed to L{Team.do} + which have not yet been sent to a worker to be performed because not + enough workers are available. + @type backloggedWorkCount: L{int} + """ + + def __init__( + self, idleWorkerCount: int, busyWorkerCount: int, backloggedWorkCount: int + ) -> None: + self.idleWorkerCount = idleWorkerCount + self.busyWorkerCount = busyWorkerCount + self.backloggedWorkCount = backloggedWorkCount + + +@implementer(IWorker) +class Team: + """ + A composite L{IWorker} implementation. + + @ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit + yet. This may be set by an arbitrary thread since L{Team.quit} may be + called from anywhere. + + @ivar _coordinator: the L{IExclusiveWorker} coordinating access to this + L{Team}'s internal resources. + + @ivar _createWorker: a callable that will create new workers. + + @ivar _logException: a 0-argument callable called in an exception context + when there is an unhandled error from a task passed to L{Team.do} + + @ivar _idle: a L{set} of idle workers. + + @ivar _busyCount: the number of workers currently busy. + + @ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed + to L{Team.do} - that are outstanding. + + @ivar _shouldQuitCoordinator: A flag indicating that the coordinator should + be quit at the next available opportunity. Unlike L{Team._quit}, this + flag is only set by the coordinator. + + @ivar _toShrink: the number of workers to shrink this L{Team} by at the + next available opportunity; set in the coordinator. + """ + + def __init__( + self, + coordinator: IExclusiveWorker, + createWorker: Callable[[], Optional[IWorker]], + logException: Callable[[], None], + ): + """ + @param coordinator: an L{IExclusiveWorker} which will coordinate access + to resources on this L{Team}; that is to say, an + L{IExclusiveWorker} whose C{do} method ensures that its given work + will be executed in a mutually exclusive context, not in parallel + with other work enqueued by C{do} (although possibly in parallel + with the caller). + + @param createWorker: A 0-argument callable that will create an + L{IWorker} to perform work. + + @param logException: A 0-argument callable called in an exception + context when the work passed to C{do} raises an exception. + """ + self._quit = Quit() + self._coordinator = coordinator + self._createWorker = createWorker + self._logException = logException + + # Don't touch these except from the coordinator. + self._idle: Set[IWorker] = set() + self._busyCount = 0 + self._pending: "deque[Callable[..., object]]" = deque() + self._shouldQuitCoordinator = False + self._toShrink = 0 + + def statistics(self) -> Statistics: + """ + Gather information on the current status of this L{Team}. + + @return: a L{Statistics} describing the current state of this L{Team}. + """ + return Statistics(len(self._idle), self._busyCount, len(self._pending)) + + def grow(self, n: int) -> None: + """ + Increase the the number of idle workers by C{n}. + + @param n: The number of new idle workers to create. + @type n: L{int} + """ + self._quit.check() + + @self._coordinator.do + def createOneWorker() -> None: + for x in range(n): + worker = self._createWorker() + if worker is None: + return + self._recycleWorker(worker) + + def shrink(self, n: Optional[int] = None) -> None: + """ + Decrease the number of idle workers by C{n}. + + @param n: The number of idle workers to shut down, or L{None} (or + unspecified) to shut down all workers. + @type n: L{int} or L{None} + """ + self._quit.check() + self._coordinator.do(lambda: self._quitIdlers(n)) + + def _quitIdlers(self, n: Optional[int] = None) -> None: + """ + The implmentation of C{shrink}, performed by the coordinator worker. + + @param n: see L{Team.shrink} + """ + if n is None: + n = len(self._idle) + self._busyCount + for x in range(n): + if self._idle: + self._idle.pop().quit() + else: + self._toShrink += 1 + if self._shouldQuitCoordinator and self._busyCount == 0: + self._coordinator.quit() + + def do(self, task: Callable[[], None]) -> None: + """ + Perform some work in a worker created by C{createWorker}. + + @param task: the callable to run + """ + self._quit.check() + self._coordinator.do(lambda: self._coordinateThisTask(task)) + + def _coordinateThisTask(self, task: Callable[..., object]) -> None: + """ + Select a worker to dispatch to, either an idle one or a new one, and + perform it. + + This method should run on the coordinator worker. + + @param task: the task to dispatch + @type task: 0-argument callable + """ + worker = self._idle.pop() if self._idle else self._createWorker() + if worker is None: + # The createWorker method may return None if we're out of resources + # to create workers. + self._pending.append(task) + return + not_none_worker = worker + self._busyCount += 1 + + @worker.do + def doWork() -> None: + try: + task() + except BaseException: + self._logException() + + @self._coordinator.do + def idleAndPending() -> None: + self._busyCount -= 1 + self._recycleWorker(not_none_worker) + + def _recycleWorker(self, worker: IWorker) -> None: + """ + Called only from coordinator. + + Recycle the given worker into the idle pool. + + @param worker: a worker created by C{createWorker} and now idle. + @type worker: L{IWorker} + """ + self._idle.add(worker) + if self._pending: + # Re-try the first enqueued thing. + # (Explicitly do _not_ honor _quit.) + self._coordinateThisTask(self._pending.popleft()) + elif self._shouldQuitCoordinator: + self._quitIdlers() + elif self._toShrink > 0: + self._toShrink -= 1 + self._idle.remove(worker) + worker.quit() + + def quit(self) -> None: + """ + Stop doing work and shut down all idle workers. + """ + self._quit.set() + # In case all the workers are idle when we do this. + + @self._coordinator.do + def startFinishing() -> None: + self._shouldQuitCoordinator = True + self._quitIdlers() diff --git a/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py b/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py new file mode 100644 index 0000000000..e7ffc09758 --- /dev/null +++ b/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py @@ -0,0 +1,121 @@ +# -*- test-case-name: twisted._threads.test.test_threadworker -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of an L{IWorker} based on native threads and queues. +""" + + +from typing import Callable + +from zope.interface import implementer + +from ._convenience import Quit +from ._ithreads import IExclusiveWorker + +_stop = object() + + +@implementer(IExclusiveWorker) +class ThreadWorker: + """ + An L{IExclusiveWorker} implemented based on a single thread and a queue. + + This worker ensures exclusivity (i.e. it is an L{IExclusiveWorker} and not + an L{IWorker}) by performing all of the work passed to C{do} on the I{same} + thread. + """ + + def __init__(self, startThread, queue): + """ + Create a L{ThreadWorker} with a function to start a thread and a queue + to use to communicate with that thread. + + @param startThread: a callable that takes a callable to run in another + thread. + @type startThread: callable taking a 0-argument callable and returning + nothing. + + @param queue: A L{Queue} to use to give tasks to the thread created by + C{startThread}. + @type queue: L{Queue} + """ + self._q = queue + self._hasQuit = Quit() + + def work(): + for task in iter(queue.get, _stop): + task() + + startThread(work) + + def do(self, task: Callable[[], None]) -> None: + """ + Perform the given task on the thread owned by this L{ThreadWorker}. + + @param task: the function to call on a thread. + """ + self._hasQuit.check() + self._q.put(task) + + def quit(self): + """ + Reject all future work and stop the thread started by C{__init__}. + """ + # Reject all future work. Set this _before_ enqueueing _stop, so + # that no work is ever enqueued _after_ _stop. + self._hasQuit.set() + self._q.put(_stop) + + +@implementer(IExclusiveWorker) +class LockWorker: + """ + An L{IWorker} implemented based on a mutual-exclusion lock. + """ + + def __init__(self, lock, local): + """ + @param lock: A mutual-exclusion lock, with C{acquire} and C{release} + methods. + @type lock: L{threading.Lock} + + @param local: Local storage. + @type local: L{threading.local} + """ + self._quit = Quit() + self._lock = lock + self._local = local + + def do(self, work: Callable[[], None]) -> None: + """ + Do the given work on this thread, with the mutex acquired. If this is + called re-entrantly, return and wait for the outer invocation to do the + work. + + @param work: the work to do with the lock held. + """ + lock = self._lock + local = self._local + self._quit.check() + working = getattr(local, "working", None) + if working is None: + working = local.working = [] + working.append(work) + lock.acquire() + try: + while working: + working.pop(0)() + finally: + lock.release() + local.working = None + else: + working.append(work) + + def quit(self): + """ + Quit this L{LockWorker}. + """ + self._quit.set() + self._lock = None |