diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/_threads/_team.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/_threads/_team.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/_threads/_team.py | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/_threads/_team.py b/contrib/python/Twisted/py2/twisted/_threads/_team.py new file mode 100644 index 0000000000..83b777abd3 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/_threads/_team.py @@ -0,0 +1,231 @@ +# -*- test-case-name: twisted._threads.test.test_team -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Implementation of a L{Team} of workers; a thread-pool that can allocate work to +workers. +""" + +from __future__ import absolute_import, division, print_function + +from collections import deque +from zope.interface import implementer + +from . import IWorker +from ._convenience import Quit + + + +class Statistics(object): + """ + Statistics about a L{Team}'s current activity. + + @ivar idleWorkerCount: The number of idle workers. + @type idleWorkerCount: L{int} + + @ivar busyWorkerCount: The number of busy workers. + @type busyWorkerCount: L{int} + + @ivar backloggedWorkCount: The number of work items passed to L{Team.do} + which have not yet been sent to a worker to be performed because not + enough workers are available. + @type backloggedWorkCount: L{int} + """ + + def __init__(self, idleWorkerCount, busyWorkerCount, + backloggedWorkCount): + self.idleWorkerCount = idleWorkerCount + self.busyWorkerCount = busyWorkerCount + self.backloggedWorkCount = backloggedWorkCount + + + +@implementer(IWorker) +class Team(object): + """ + A composite L{IWorker} implementation. + + @ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit + yet. This may be set by an arbitrary thread since L{Team.quit} may be + called from anywhere. + + @ivar _coordinator: the L{IExclusiveWorker} coordinating access to this + L{Team}'s internal resources. + + @ivar _createWorker: a callable that will create new workers. + + @ivar _logException: a 0-argument callable called in an exception context + when there is an unhandled error from a task passed to L{Team.do} + + @ivar _idle: a L{set} of idle workers. + + @ivar _busyCount: the number of workers currently busy. + + @ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed + to L{Team.do} - that are outstanding. + + @ivar _shouldQuitCoordinator: A flag indicating that the coordinator should + be quit at the next available opportunity. Unlike L{Team._quit}, this + flag is only set by the coordinator. + + @ivar _toShrink: the number of workers to shrink this L{Team} by at the + next available opportunity; set in the coordinator. + """ + + def __init__(self, coordinator, createWorker, logException): + """ + @param coordinator: an L{IExclusiveWorker} which will coordinate access + to resources on this L{Team}; that is to say, an + L{IExclusiveWorker} whose C{do} method ensures that its given work + will be executed in a mutually exclusive context, not in parallel + with other work enqueued by C{do} (although possibly in parallel + with the caller). + + @param createWorker: A 0-argument callable that will create an + L{IWorker} to perform work. + + @param logException: A 0-argument callable called in an exception + context when the work passed to C{do} raises an exception. + """ + self._quit = Quit() + self._coordinator = coordinator + self._createWorker = createWorker + self._logException = logException + + # Don't touch these except from the coordinator. + self._idle = set() + self._busyCount = 0 + self._pending = deque() + self._shouldQuitCoordinator = False + self._toShrink = 0 + + + def statistics(self): + """ + Gather information on the current status of this L{Team}. + + @return: a L{Statistics} describing the current state of this L{Team}. + """ + return Statistics(len(self._idle), self._busyCount, len(self._pending)) + + + def grow(self, n): + """ + Increase the the number of idle workers by C{n}. + + @param n: The number of new idle workers to create. + @type n: L{int} + """ + self._quit.check() + @self._coordinator.do + def createOneWorker(): + for x in range(n): + worker = self._createWorker() + if worker is None: + return + self._recycleWorker(worker) + + + def shrink(self, n=None): + """ + Decrease the number of idle workers by C{n}. + + @param n: The number of idle workers to shut down, or L{None} (or + unspecified) to shut down all workers. + @type n: L{int} or L{None} + """ + self._quit.check() + self._coordinator.do(lambda: self._quitIdlers(n)) + + + def _quitIdlers(self, n=None): + """ + The implmentation of C{shrink}, performed by the coordinator worker. + + @param n: see L{Team.shrink} + """ + if n is None: + n = len(self._idle) + self._busyCount + for x in range(n): + if self._idle: + self._idle.pop().quit() + else: + self._toShrink += 1 + if self._shouldQuitCoordinator and self._busyCount == 0: + self._coordinator.quit() + + + def do(self, task): + """ + Perform some work in a worker created by C{createWorker}. + + @param task: the callable to run + """ + self._quit.check() + self._coordinator.do(lambda: self._coordinateThisTask(task)) + + + def _coordinateThisTask(self, task): + """ + Select a worker to dispatch to, either an idle one or a new one, and + perform it. + + This method should run on the coordinator worker. + + @param task: the task to dispatch + @type task: 0-argument callable + """ + worker = (self._idle.pop() if self._idle + else self._createWorker()) + if worker is None: + # The createWorker method may return None if we're out of resources + # to create workers. + self._pending.append(task) + return + self._busyCount += 1 + @worker.do + def doWork(): + try: + task() + except: + self._logException() + + @self._coordinator.do + def idleAndPending(): + self._busyCount -= 1 + self._recycleWorker(worker) + + + def _recycleWorker(self, worker): + """ + Called only from coordinator. + + Recycle the given worker into the idle pool. + + @param worker: a worker created by C{createWorker} and now idle. + @type worker: L{IWorker} + """ + self._idle.add(worker) + if self._pending: + # Re-try the first enqueued thing. + # (Explicitly do _not_ honor _quit.) + self._coordinateThisTask(self._pending.popleft()) + elif self._shouldQuitCoordinator: + self._quitIdlers() + elif self._toShrink > 0: + self._toShrink -= 1 + self._idle.remove(worker) + worker.quit() + + + def quit(self): + """ + Stop doing work and shut down all idle workers. + """ + self._quit.set() + # In case all the workers are idle when we do this. + @self._coordinator.do + def startFinishing(): + self._shouldQuitCoordinator = True + self._quitIdlers() |