1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# -*- test-case-name: twisted._threads.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Top level thread pool interface, used to implement
L{twisted.python.threadpool}.
"""
from __future__ import absolute_import, division, print_function
from threading import Thread, Lock, local as LocalStorage
try:
from Queue import Queue
except ImportError:
from queue import Queue
from twisted.python.log import err
from ._threadworker import LockWorker
from ._team import Team
from ._threadworker import ThreadWorker
def pool(currentLimit, threadFactory=Thread):
"""
Construct a L{Team} that spawns threads as a thread pool, with the given
limiting function.
@note: Future maintainers: while the public API for the eventual move to
twisted.threads should look I{something} like this, and while this
function is necessary to implement the API described by
L{twisted.python.threadpool}, I am starting to think the idea of a hard
upper limit on threadpool size is just bad (turning memory performance
issues into correctness issues well before we run into memory
pressure), and instead we should build something with reactor
integration for slowly releasing idle threads when they're not needed
and I{rate} limiting the creation of new threads rather than just
hard-capping it.
@param currentLimit: a callable that returns the current limit on the
number of workers that the returned L{Team} should create; if it
already has more workers than that value, no new workers will be
created.
@type currentLimit: 0-argument callable returning L{int}
@param reactor: If passed, the L{IReactorFromThreads} / L{IReactorCore} to
be used to coordinate actions on the L{Team} itself. Otherwise, a
L{LockWorker} will be used.
@return: a new L{Team}.
"""
def startThread(target):
return threadFactory(target=target).start()
def limitedWorkerCreator():
stats = team.statistics()
if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
return None
return ThreadWorker(startThread, Queue())
team = Team(coordinator=LockWorker(Lock(), LocalStorage()),
createWorker=limitedWorkerCreator,
logException=err)
return team
|