1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Extended thread dispatching support.
For basic support see reactor threading API docs.
"""
from __future__ import annotations
import queue as Queue
from typing import Callable, TypeVar
from typing_extensions import ParamSpec
from twisted.internet import defer
from twisted.internet.interfaces import IReactorFromThreads
from twisted.python import failure
from twisted.python.threadpool import ThreadPool
_P = ParamSpec("_P")
_R = TypeVar("_R")
def deferToThreadPool(
reactor: IReactorFromThreads,
threadpool: ThreadPool,
f: Callable[_P, _R],
*args: _P.args,
**kwargs: _P.kwargs,
) -> defer.Deferred[_R]:
"""
Call the function C{f} using a thread from the given threadpool and return
the result as a Deferred.
This function is only used by client code which is maintaining its own
threadpool. To run a function in the reactor's threadpool, use
C{deferToThread}.
@param reactor: The reactor in whose main thread the Deferred will be
invoked.
@param threadpool: An object which supports the C{callInThreadWithCallback}
method of C{twisted.python.threadpool.ThreadPool}.
@param f: The function to call.
@param args: positional arguments to pass to f.
@param kwargs: keyword arguments to pass to f.
@return: A Deferred which fires a callback with the result of f, or an
errback with a L{twisted.python.failure.Failure} if f throws an
exception.
"""
d: defer.Deferred[_R] = defer.Deferred()
def onResult(success: bool, result: _R | BaseException) -> None:
if success:
reactor.callFromThread(d.callback, result)
else:
reactor.callFromThread(d.errback, result)
threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
return d
def deferToThread(f, *args, **kwargs):
"""
Run a function in a thread and return the result as a Deferred.
@param f: The function to call.
@param args: positional arguments to pass to f.
@param kwargs: keyword arguments to pass to f.
@return: A Deferred which fires a callback with the result of f,
or an errback with a L{twisted.python.failure.Failure} if f throws
an exception.
"""
from twisted.internet import reactor
return deferToThreadPool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
def _runMultiple(tupleList):
"""
Run a list of functions.
"""
for f, args, kwargs in tupleList:
f(*args, **kwargs)
def callMultipleInThread(tupleList):
"""
Run a list of functions in the same thread.
tupleList should be a list of (function, argsList, kwargsDict) tuples.
"""
from twisted.internet import reactor
reactor.callInThread(_runMultiple, tupleList)
def blockingCallFromThread(reactor, f, *a, **kw):
"""
Run a function in the reactor from a thread, and wait for the result
synchronously. If the function returns a L{Deferred}, wait for its
result and return that.
@param reactor: The L{IReactorThreads} provider which will be used to
schedule the function call.
@param f: the callable to run in the reactor thread
@type f: any callable.
@param a: the arguments to pass to C{f}.
@param kw: the keyword arguments to pass to C{f}.
@return: the result of the L{Deferred} returned by C{f}, or the result
of C{f} if it returns anything other than a L{Deferred}.
@raise Exception: If C{f} raises a synchronous exception,
C{blockingCallFromThread} will raise that exception. If C{f}
returns a L{Deferred} which fires with a L{Failure},
C{blockingCallFromThread} will raise that failure's exception (see
L{Failure.raiseException}).
"""
queue = Queue.Queue()
def _callFromThread():
result = defer.maybeDeferred(f, *a, **kw)
result.addBoth(queue.put)
reactor.callFromThread(_callFromThread)
result = queue.get()
if isinstance(result, failure.Failure):
result.raiseException()
return result
__all__ = [
"deferToThread",
"deferToThreadPool",
"callMultipleInThread",
"blockingCallFromThread",
]
|