aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/_threads
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/_threads
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/_threads')
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/__init__.py24
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_convenience.py43
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_ithreads.py61
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_memory.py70
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_pool.py73
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_team.py232
-rw-r--r--contrib/python/Twisted/py3/twisted/_threads/_threadworker.py121
7 files changed, 624 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/_threads/__init__.py b/contrib/python/Twisted/py3/twisted/_threads/__init__.py
new file mode 100644
index 0000000000..13c7c7dda5
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/__init__.py
@@ -0,0 +1,24 @@
+# -*- test-case-name: twisted.test.test_paths -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Twisted integration with operating system threads.
+"""
+
+
+from ._ithreads import AlreadyQuit, IWorker
+from ._memory import createMemoryWorker
+from ._pool import pool
+from ._team import Team
+from ._threadworker import LockWorker, ThreadWorker
+
+__all__ = [
+ "ThreadWorker",
+ "LockWorker",
+ "IWorker",
+ "AlreadyQuit",
+ "Team",
+ "createMemoryWorker",
+ "pool",
+]
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_convenience.py b/contrib/python/Twisted/py3/twisted/_threads/_convenience.py
new file mode 100644
index 0000000000..deff576462
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_convenience.py
@@ -0,0 +1,43 @@
+# -*- test-case-name: twisted._threads.test.test_convenience -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Common functionality used within the implementation of various workers.
+"""
+
+
+from ._ithreads import AlreadyQuit
+
+
+class Quit:
+ """
+ A flag representing whether a worker has been quit.
+
+ @ivar isSet: Whether this flag is set.
+ @type isSet: L{bool}
+ """
+
+ def __init__(self):
+ """
+ Create a L{Quit} un-set.
+ """
+ self.isSet = False
+
+ def set(self):
+ """
+ Set the flag if it has not been set.
+
+ @raise AlreadyQuit: If it has been set.
+ """
+ self.check()
+ self.isSet = True
+
+ def check(self):
+ """
+ Check if the flag has been set.
+
+ @raise AlreadyQuit: If it has been set.
+ """
+ if self.isSet:
+ raise AlreadyQuit()
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py b/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py
new file mode 100644
index 0000000000..cab9135f87
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_ithreads.py
@@ -0,0 +1,61 @@
+# -*- test-case-name: twisted._threads.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Interfaces related to threads.
+"""
+
+
+from typing import Callable
+
+from zope.interface import Interface
+
+
+class AlreadyQuit(Exception):
+ """
+ This worker worker is dead and cannot execute more instructions.
+ """
+
+
+class IWorker(Interface):
+ """
+ A worker that can perform some work concurrently.
+
+ All methods on this interface must be thread-safe.
+ """
+
+ def do(task: Callable[[], None]) -> None:
+ """
+ Perform the given task.
+
+ As an interface, this method makes no specific claims about concurrent
+ execution. An L{IWorker}'s C{do} implementation may defer execution
+ for later on the same thread, immediately on a different thread, or
+ some combination of the two. It is valid for a C{do} method to
+ schedule C{task} in such a way that it may never be executed.
+
+ It is important for some implementations to provide specific properties
+ with respect to where C{task} is executed, of course, and client code
+ may rely on a more specific implementation of C{do} than L{IWorker}.
+
+ @param task: a task to call in a thread or other concurrent context.
+ @type task: 0-argument callable
+
+ @raise AlreadyQuit: if C{quit} has been called.
+ """
+
+ def quit():
+ """
+ Free any resources associated with this L{IWorker} and cause it to
+ reject all future work.
+
+ @raise AlreadyQuit: if this method has already been called.
+ """
+
+
+class IExclusiveWorker(IWorker):
+ """
+ Like L{IWorker}, but with the additional guarantee that the callables
+ passed to C{do} will not be called exclusively with each other.
+ """
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_memory.py b/contrib/python/Twisted/py3/twisted/_threads/_memory.py
new file mode 100644
index 0000000000..4c56db02ae
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_memory.py
@@ -0,0 +1,70 @@
+# -*- test-case-name: twisted._threads.test.test_memory -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of an in-memory worker that defers execution.
+"""
+
+
+from zope.interface import implementer
+
+from . import IWorker
+from ._convenience import Quit
+
+NoMoreWork = object()
+
+
+@implementer(IWorker)
+class MemoryWorker:
+ """
+ An L{IWorker} that queues work for later performance.
+
+ @ivar _quit: a flag indicating
+ @type _quit: L{Quit}
+ """
+
+ def __init__(self, pending=list):
+ """
+ Create a L{MemoryWorker}.
+ """
+ self._quit = Quit()
+ self._pending = pending()
+
+ def do(self, work):
+ """
+ Queue some work for to perform later; see L{createMemoryWorker}.
+
+ @param work: The work to perform.
+ """
+ self._quit.check()
+ self._pending.append(work)
+
+ def quit(self):
+ """
+ Quit this worker.
+ """
+ self._quit.set()
+ self._pending.append(NoMoreWork)
+
+
+def createMemoryWorker():
+ """
+ Create an L{IWorker} that does nothing but defer work, to be performed
+ later.
+
+ @return: a worker that will enqueue work to perform later, and a callable
+ that will perform one element of that work.
+ @rtype: 2-L{tuple} of (L{IWorker}, L{callable})
+ """
+
+ def perform():
+ if not worker._pending:
+ return False
+ if worker._pending[0] is NoMoreWork:
+ return False
+ worker._pending.pop(0)()
+ return True
+
+ worker = MemoryWorker()
+ return (worker, perform)
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_pool.py b/contrib/python/Twisted/py3/twisted/_threads/_pool.py
new file mode 100644
index 0000000000..99c055d240
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_pool.py
@@ -0,0 +1,73 @@
+# -*- test-case-name: twisted._threads.test -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Top level thread pool interface, used to implement
+L{twisted.python.threadpool}.
+"""
+
+
+from queue import Queue
+from threading import Lock, Thread, local as LocalStorage
+from typing import Callable, Optional
+
+from typing_extensions import Protocol
+
+from twisted.python.log import err
+from ._ithreads import IWorker
+from ._team import Team
+from ._threadworker import LockWorker, ThreadWorker
+
+
+class _ThreadFactory(Protocol):
+ def __call__(self, *, target: Callable[..., object]) -> Thread:
+ ...
+
+
+def pool(
+ currentLimit: Callable[[], int], threadFactory: _ThreadFactory = Thread
+) -> Team:
+ """
+ Construct a L{Team} that spawns threads as a thread pool, with the given
+ limiting function.
+
+ @note: Future maintainers: while the public API for the eventual move to
+ twisted.threads should look I{something} like this, and while this
+ function is necessary to implement the API described by
+ L{twisted.python.threadpool}, I am starting to think the idea of a hard
+ upper limit on threadpool size is just bad (turning memory performance
+ issues into correctness issues well before we run into memory
+ pressure), and instead we should build something with reactor
+ integration for slowly releasing idle threads when they're not needed
+ and I{rate} limiting the creation of new threads rather than just
+ hard-capping it.
+
+ @param currentLimit: a callable that returns the current limit on the
+ number of workers that the returned L{Team} should create; if it
+ already has more workers than that value, no new workers will be
+ created.
+ @type currentLimit: 0-argument callable returning L{int}
+
+ @param threadFactory: Factory that, when given a C{target} keyword argument,
+ returns a L{threading.Thread} that will run that target.
+ @type threadFactory: callable returning a L{threading.Thread}
+
+ @return: a new L{Team}.
+ """
+
+ def startThread(target: Callable[..., object]) -> None:
+ return threadFactory(target=target).start()
+
+ def limitedWorkerCreator() -> Optional[IWorker]:
+ stats = team.statistics()
+ if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
+ return None
+ return ThreadWorker(startThread, Queue())
+
+ team = Team(
+ coordinator=LockWorker(Lock(), LocalStorage()),
+ createWorker=limitedWorkerCreator,
+ logException=err,
+ )
+ return team
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_team.py b/contrib/python/Twisted/py3/twisted/_threads/_team.py
new file mode 100644
index 0000000000..d15ae04242
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_team.py
@@ -0,0 +1,232 @@
+# -*- test-case-name: twisted._threads.test.test_team -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of a L{Team} of workers; a thread-pool that can allocate work to
+workers.
+"""
+from __future__ import annotations
+
+from collections import deque
+from typing import Callable, Optional, Set
+
+from zope.interface import implementer
+
+from . import IWorker
+from ._convenience import Quit
+from ._ithreads import IExclusiveWorker
+
+
+class Statistics:
+ """
+ Statistics about a L{Team}'s current activity.
+
+ @ivar idleWorkerCount: The number of idle workers.
+ @type idleWorkerCount: L{int}
+
+ @ivar busyWorkerCount: The number of busy workers.
+ @type busyWorkerCount: L{int}
+
+ @ivar backloggedWorkCount: The number of work items passed to L{Team.do}
+ which have not yet been sent to a worker to be performed because not
+ enough workers are available.
+ @type backloggedWorkCount: L{int}
+ """
+
+ def __init__(
+ self, idleWorkerCount: int, busyWorkerCount: int, backloggedWorkCount: int
+ ) -> None:
+ self.idleWorkerCount = idleWorkerCount
+ self.busyWorkerCount = busyWorkerCount
+ self.backloggedWorkCount = backloggedWorkCount
+
+
+@implementer(IWorker)
+class Team:
+ """
+ A composite L{IWorker} implementation.
+
+ @ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit
+ yet. This may be set by an arbitrary thread since L{Team.quit} may be
+ called from anywhere.
+
+ @ivar _coordinator: the L{IExclusiveWorker} coordinating access to this
+ L{Team}'s internal resources.
+
+ @ivar _createWorker: a callable that will create new workers.
+
+ @ivar _logException: a 0-argument callable called in an exception context
+ when there is an unhandled error from a task passed to L{Team.do}
+
+ @ivar _idle: a L{set} of idle workers.
+
+ @ivar _busyCount: the number of workers currently busy.
+
+ @ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed
+ to L{Team.do} - that are outstanding.
+
+ @ivar _shouldQuitCoordinator: A flag indicating that the coordinator should
+ be quit at the next available opportunity. Unlike L{Team._quit}, this
+ flag is only set by the coordinator.
+
+ @ivar _toShrink: the number of workers to shrink this L{Team} by at the
+ next available opportunity; set in the coordinator.
+ """
+
+ def __init__(
+ self,
+ coordinator: IExclusiveWorker,
+ createWorker: Callable[[], Optional[IWorker]],
+ logException: Callable[[], None],
+ ):
+ """
+ @param coordinator: an L{IExclusiveWorker} which will coordinate access
+ to resources on this L{Team}; that is to say, an
+ L{IExclusiveWorker} whose C{do} method ensures that its given work
+ will be executed in a mutually exclusive context, not in parallel
+ with other work enqueued by C{do} (although possibly in parallel
+ with the caller).
+
+ @param createWorker: A 0-argument callable that will create an
+ L{IWorker} to perform work.
+
+ @param logException: A 0-argument callable called in an exception
+ context when the work passed to C{do} raises an exception.
+ """
+ self._quit = Quit()
+ self._coordinator = coordinator
+ self._createWorker = createWorker
+ self._logException = logException
+
+ # Don't touch these except from the coordinator.
+ self._idle: Set[IWorker] = set()
+ self._busyCount = 0
+ self._pending: "deque[Callable[..., object]]" = deque()
+ self._shouldQuitCoordinator = False
+ self._toShrink = 0
+
+ def statistics(self) -> Statistics:
+ """
+ Gather information on the current status of this L{Team}.
+
+ @return: a L{Statistics} describing the current state of this L{Team}.
+ """
+ return Statistics(len(self._idle), self._busyCount, len(self._pending))
+
+ def grow(self, n: int) -> None:
+ """
+ Increase the the number of idle workers by C{n}.
+
+ @param n: The number of new idle workers to create.
+ @type n: L{int}
+ """
+ self._quit.check()
+
+ @self._coordinator.do
+ def createOneWorker() -> None:
+ for x in range(n):
+ worker = self._createWorker()
+ if worker is None:
+ return
+ self._recycleWorker(worker)
+
+ def shrink(self, n: Optional[int] = None) -> None:
+ """
+ Decrease the number of idle workers by C{n}.
+
+ @param n: The number of idle workers to shut down, or L{None} (or
+ unspecified) to shut down all workers.
+ @type n: L{int} or L{None}
+ """
+ self._quit.check()
+ self._coordinator.do(lambda: self._quitIdlers(n))
+
+ def _quitIdlers(self, n: Optional[int] = None) -> None:
+ """
+ The implmentation of C{shrink}, performed by the coordinator worker.
+
+ @param n: see L{Team.shrink}
+ """
+ if n is None:
+ n = len(self._idle) + self._busyCount
+ for x in range(n):
+ if self._idle:
+ self._idle.pop().quit()
+ else:
+ self._toShrink += 1
+ if self._shouldQuitCoordinator and self._busyCount == 0:
+ self._coordinator.quit()
+
+ def do(self, task: Callable[[], None]) -> None:
+ """
+ Perform some work in a worker created by C{createWorker}.
+
+ @param task: the callable to run
+ """
+ self._quit.check()
+ self._coordinator.do(lambda: self._coordinateThisTask(task))
+
+ def _coordinateThisTask(self, task: Callable[..., object]) -> None:
+ """
+ Select a worker to dispatch to, either an idle one or a new one, and
+ perform it.
+
+ This method should run on the coordinator worker.
+
+ @param task: the task to dispatch
+ @type task: 0-argument callable
+ """
+ worker = self._idle.pop() if self._idle else self._createWorker()
+ if worker is None:
+ # The createWorker method may return None if we're out of resources
+ # to create workers.
+ self._pending.append(task)
+ return
+ not_none_worker = worker
+ self._busyCount += 1
+
+ @worker.do
+ def doWork() -> None:
+ try:
+ task()
+ except BaseException:
+ self._logException()
+
+ @self._coordinator.do
+ def idleAndPending() -> None:
+ self._busyCount -= 1
+ self._recycleWorker(not_none_worker)
+
+ def _recycleWorker(self, worker: IWorker) -> None:
+ """
+ Called only from coordinator.
+
+ Recycle the given worker into the idle pool.
+
+ @param worker: a worker created by C{createWorker} and now idle.
+ @type worker: L{IWorker}
+ """
+ self._idle.add(worker)
+ if self._pending:
+ # Re-try the first enqueued thing.
+ # (Explicitly do _not_ honor _quit.)
+ self._coordinateThisTask(self._pending.popleft())
+ elif self._shouldQuitCoordinator:
+ self._quitIdlers()
+ elif self._toShrink > 0:
+ self._toShrink -= 1
+ self._idle.remove(worker)
+ worker.quit()
+
+ def quit(self) -> None:
+ """
+ Stop doing work and shut down all idle workers.
+ """
+ self._quit.set()
+ # In case all the workers are idle when we do this.
+
+ @self._coordinator.do
+ def startFinishing() -> None:
+ self._shouldQuitCoordinator = True
+ self._quitIdlers()
diff --git a/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py b/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py
new file mode 100644
index 0000000000..e7ffc09758
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/_threads/_threadworker.py
@@ -0,0 +1,121 @@
+# -*- test-case-name: twisted._threads.test.test_threadworker -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implementation of an L{IWorker} based on native threads and queues.
+"""
+
+
+from typing import Callable
+
+from zope.interface import implementer
+
+from ._convenience import Quit
+from ._ithreads import IExclusiveWorker
+
+_stop = object()
+
+
+@implementer(IExclusiveWorker)
+class ThreadWorker:
+ """
+ An L{IExclusiveWorker} implemented based on a single thread and a queue.
+
+ This worker ensures exclusivity (i.e. it is an L{IExclusiveWorker} and not
+ an L{IWorker}) by performing all of the work passed to C{do} on the I{same}
+ thread.
+ """
+
+ def __init__(self, startThread, queue):
+ """
+ Create a L{ThreadWorker} with a function to start a thread and a queue
+ to use to communicate with that thread.
+
+ @param startThread: a callable that takes a callable to run in another
+ thread.
+ @type startThread: callable taking a 0-argument callable and returning
+ nothing.
+
+ @param queue: A L{Queue} to use to give tasks to the thread created by
+ C{startThread}.
+ @type queue: L{Queue}
+ """
+ self._q = queue
+ self._hasQuit = Quit()
+
+ def work():
+ for task in iter(queue.get, _stop):
+ task()
+
+ startThread(work)
+
+ def do(self, task: Callable[[], None]) -> None:
+ """
+ Perform the given task on the thread owned by this L{ThreadWorker}.
+
+ @param task: the function to call on a thread.
+ """
+ self._hasQuit.check()
+ self._q.put(task)
+
+ def quit(self):
+ """
+ Reject all future work and stop the thread started by C{__init__}.
+ """
+ # Reject all future work. Set this _before_ enqueueing _stop, so
+ # that no work is ever enqueued _after_ _stop.
+ self._hasQuit.set()
+ self._q.put(_stop)
+
+
+@implementer(IExclusiveWorker)
+class LockWorker:
+ """
+ An L{IWorker} implemented based on a mutual-exclusion lock.
+ """
+
+ def __init__(self, lock, local):
+ """
+ @param lock: A mutual-exclusion lock, with C{acquire} and C{release}
+ methods.
+ @type lock: L{threading.Lock}
+
+ @param local: Local storage.
+ @type local: L{threading.local}
+ """
+ self._quit = Quit()
+ self._lock = lock
+ self._local = local
+
+ def do(self, work: Callable[[], None]) -> None:
+ """
+ Do the given work on this thread, with the mutex acquired. If this is
+ called re-entrantly, return and wait for the outer invocation to do the
+ work.
+
+ @param work: the work to do with the lock held.
+ """
+ lock = self._lock
+ local = self._local
+ self._quit.check()
+ working = getattr(local, "working", None)
+ if working is None:
+ working = local.working = []
+ working.append(work)
+ lock.acquire()
+ try:
+ while working:
+ working.pop(0)()
+ finally:
+ lock.release()
+ local.working = None
+ else:
+ working.append(work)
+
+ def quit(self):
+ """
+ Quit this L{LockWorker}.
+ """
+ self._quit.set()
+ self._lock = None