1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
|
# -*- test-case-name: twisted._threads.test.test_team -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Implementation of a L{Team} of workers; a thread-pool that can allocate work to
workers.
"""
from __future__ import absolute_import, division, print_function
from collections import deque
from zope.interface import implementer
from . import IWorker
from ._convenience import Quit
class Statistics(object):
"""
Statistics about a L{Team}'s current activity.
@ivar idleWorkerCount: The number of idle workers.
@type idleWorkerCount: L{int}
@ivar busyWorkerCount: The number of busy workers.
@type busyWorkerCount: L{int}
@ivar backloggedWorkCount: The number of work items passed to L{Team.do}
which have not yet been sent to a worker to be performed because not
enough workers are available.
@type backloggedWorkCount: L{int}
"""
def __init__(self, idleWorkerCount, busyWorkerCount,
backloggedWorkCount):
self.idleWorkerCount = idleWorkerCount
self.busyWorkerCount = busyWorkerCount
self.backloggedWorkCount = backloggedWorkCount
@implementer(IWorker)
class Team(object):
"""
A composite L{IWorker} implementation.
@ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit
yet. This may be set by an arbitrary thread since L{Team.quit} may be
called from anywhere.
@ivar _coordinator: the L{IExclusiveWorker} coordinating access to this
L{Team}'s internal resources.
@ivar _createWorker: a callable that will create new workers.
@ivar _logException: a 0-argument callable called in an exception context
when there is an unhandled error from a task passed to L{Team.do}
@ivar _idle: a L{set} of idle workers.
@ivar _busyCount: the number of workers currently busy.
@ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed
to L{Team.do} - that are outstanding.
@ivar _shouldQuitCoordinator: A flag indicating that the coordinator should
be quit at the next available opportunity. Unlike L{Team._quit}, this
flag is only set by the coordinator.
@ivar _toShrink: the number of workers to shrink this L{Team} by at the
next available opportunity; set in the coordinator.
"""
def __init__(self, coordinator, createWorker, logException):
"""
@param coordinator: an L{IExclusiveWorker} which will coordinate access
to resources on this L{Team}; that is to say, an
L{IExclusiveWorker} whose C{do} method ensures that its given work
will be executed in a mutually exclusive context, not in parallel
with other work enqueued by C{do} (although possibly in parallel
with the caller).
@param createWorker: A 0-argument callable that will create an
L{IWorker} to perform work.
@param logException: A 0-argument callable called in an exception
context when the work passed to C{do} raises an exception.
"""
self._quit = Quit()
self._coordinator = coordinator
self._createWorker = createWorker
self._logException = logException
# Don't touch these except from the coordinator.
self._idle = set()
self._busyCount = 0
self._pending = deque()
self._shouldQuitCoordinator = False
self._toShrink = 0
def statistics(self):
"""
Gather information on the current status of this L{Team}.
@return: a L{Statistics} describing the current state of this L{Team}.
"""
return Statistics(len(self._idle), self._busyCount, len(self._pending))
def grow(self, n):
"""
Increase the the number of idle workers by C{n}.
@param n: The number of new idle workers to create.
@type n: L{int}
"""
self._quit.check()
@self._coordinator.do
def createOneWorker():
for x in range(n):
worker = self._createWorker()
if worker is None:
return
self._recycleWorker(worker)
def shrink(self, n=None):
"""
Decrease the number of idle workers by C{n}.
@param n: The number of idle workers to shut down, or L{None} (or
unspecified) to shut down all workers.
@type n: L{int} or L{None}
"""
self._quit.check()
self._coordinator.do(lambda: self._quitIdlers(n))
def _quitIdlers(self, n=None):
"""
The implmentation of C{shrink}, performed by the coordinator worker.
@param n: see L{Team.shrink}
"""
if n is None:
n = len(self._idle) + self._busyCount
for x in range(n):
if self._idle:
self._idle.pop().quit()
else:
self._toShrink += 1
if self._shouldQuitCoordinator and self._busyCount == 0:
self._coordinator.quit()
def do(self, task):
"""
Perform some work in a worker created by C{createWorker}.
@param task: the callable to run
"""
self._quit.check()
self._coordinator.do(lambda: self._coordinateThisTask(task))
def _coordinateThisTask(self, task):
"""
Select a worker to dispatch to, either an idle one or a new one, and
perform it.
This method should run on the coordinator worker.
@param task: the task to dispatch
@type task: 0-argument callable
"""
worker = (self._idle.pop() if self._idle
else self._createWorker())
if worker is None:
# The createWorker method may return None if we're out of resources
# to create workers.
self._pending.append(task)
return
self._busyCount += 1
@worker.do
def doWork():
try:
task()
except:
self._logException()
@self._coordinator.do
def idleAndPending():
self._busyCount -= 1
self._recycleWorker(worker)
def _recycleWorker(self, worker):
"""
Called only from coordinator.
Recycle the given worker into the idle pool.
@param worker: a worker created by C{createWorker} and now idle.
@type worker: L{IWorker}
"""
self._idle.add(worker)
if self._pending:
# Re-try the first enqueued thing.
# (Explicitly do _not_ honor _quit.)
self._coordinateThisTask(self._pending.popleft())
elif self._shouldQuitCoordinator:
self._quitIdlers()
elif self._toShrink > 0:
self._toShrink -= 1
self._idle.remove(worker)
worker.quit()
def quit(self):
"""
Stop doing work and shut down all idle workers.
"""
self._quit.set()
# In case all the workers are idle when we do this.
@self._coordinator.do
def startFinishing():
self._shouldQuitCoordinator = True
self._quitIdlers()
|