1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
A win32event based implementation of the Twisted main loop.
This requires pywin32 (formerly win32all) or ActivePython to be installed.
To install the event loop (and you should do this before any connections,
listeners or connectors are added)::
from twisted.internet import win32eventreactor
win32eventreactor.install()
LIMITATIONS:
1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
2. Process running has some problems (see L{twisted.internet.process} docstring).
TODO:
1. Event loop handling of writes is *very* problematic (this is causing failed tests).
Switch to doing it the correct way, whatever that means (see below).
2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
ALTERNATIVE SOLUTIONS:
- IIRC, sockets can only be registered once. So we switch to a structure
like the poll() reactor, thus allowing us to deal with write events in
a decent fashion. This should allow us to pass tests, but we're still
limited to 64 events.
Or:
- Instead of doing a reactor, we make this an addon to the select reactor.
The WFMO event loop runs in a separate thread. This means no need to maintain
separate code for networking, 64 event limit doesn't apply to sockets,
we can run processes and other win32 stuff in default event loop. The
only problem is that we're stuck with the icky socket based waker.
Another benefit is that this could be extended to support >64 events
in a simpler manner than the previous solution.
The 2nd solution is probably what will get implemented.
"""
import sys
# System imports
import time
from threading import Thread
from weakref import WeakKeyDictionary
from zope.interface import implementer
# Win32 imports
from win32file import ( # type: ignore[import]
FD_ACCEPT,
FD_CLOSE,
FD_CONNECT,
FD_READ,
WSAEventSelect,
)
try:
# WSAEnumNetworkEvents was added in pywin32 215
from win32file import WSAEnumNetworkEvents
except ImportError:
import warnings
warnings.warn(
"Reliable disconnection notification requires pywin32 215 or later",
category=UserWarning,
)
def WSAEnumNetworkEvents(fd, event):
return {FD_READ}
import win32gui # type: ignore[import]
from win32event import ( # type: ignore[import]
QS_ALLINPUT,
WAIT_OBJECT_0,
WAIT_TIMEOUT,
CreateEvent,
MsgWaitForMultipleObjects,
)
# Twisted imports
from twisted.internet import posixbase
from twisted.internet.interfaces import IReactorFDSet, IReactorWin32Events
from twisted.internet.threads import blockingCallFromThread
from twisted.python import failure, log, threadable
@implementer(IReactorFDSet, IReactorWin32Events)
class Win32Reactor(posixbase.PosixReactorBase):
"""
Reactor that uses Win32 event APIs.
@ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
win32 event object used to check for read events for that descriptor.
@ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
arbitrary value. Keys in this dictionary will be given a chance to
write out their data.
@ivar _events: A dictionary mapping win32 event object to tuples of
L{FileDescriptor} instances and event masks.
@ivar _closedAndReading: Along with C{_closedAndNotReading}, keeps track of
descriptors which have had close notification delivered from the OS but
which we have not finished reading data from. MsgWaitForMultipleObjects
will only deliver close notification to us once, so we remember it in
these two dictionaries until we're ready to act on it. The OS has
delivered close notification for each descriptor in this dictionary, and
the descriptors are marked as allowed to handle read events in the
reactor, so they can be processed. When a descriptor is marked as not
allowed to handle read events in the reactor (ie, it is passed to
L{IReactorFDSet.removeReader}), it is moved out of this dictionary and
into C{_closedAndNotReading}. The descriptors are keys in this
dictionary. The values are arbitrary.
@type _closedAndReading: C{dict}
@ivar _closedAndNotReading: These descriptors have had close notification
delivered from the OS, but are not marked as allowed to handle read
events in the reactor. They are saved here to record their closed
state, but not processed at all. When one of these descriptors is
passed to L{IReactorFDSet.addReader}, it is moved out of this dictionary
and into C{_closedAndReading}. The descriptors are keys in this
dictionary. The values are arbitrary. This is a weak key dictionary so
that if an application tells the reactor to stop reading from a
descriptor and then forgets about that descriptor itself, the reactor
will also forget about it.
@type _closedAndNotReading: C{WeakKeyDictionary}
"""
dummyEvent = CreateEvent(None, 0, 0, None)
def __init__(self):
self._reads = {}
self._writes = {}
self._events = {}
self._closedAndReading = {}
self._closedAndNotReading = WeakKeyDictionary()
posixbase.PosixReactorBase.__init__(self)
def _makeSocketEvent(self, fd, action, why):
"""
Make a win32 event object for a socket.
"""
event = CreateEvent(None, 0, 0, None)
WSAEventSelect(fd, event, why)
self._events[event] = (fd, action)
return event
def addEvent(self, event, fd, action):
"""
Add a new win32 event to the event loop.
"""
self._events[event] = (fd, action)
def removeEvent(self, event):
"""
Remove an event.
"""
del self._events[event]
def addReader(self, reader):
"""
Add a socket FileDescriptor for notification of data available to read.
"""
if reader not in self._reads:
self._reads[reader] = self._makeSocketEvent(
reader, "doRead", FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE
)
# If the reader is closed, move it over to the dictionary of reading
# descriptors.
if reader in self._closedAndNotReading:
self._closedAndReading[reader] = True
del self._closedAndNotReading[reader]
def addWriter(self, writer):
"""
Add a socket FileDescriptor for notification of data available to write.
"""
if writer not in self._writes:
self._writes[writer] = 1
def removeReader(self, reader):
"""Remove a Selectable for notification of data available to read."""
if reader in self._reads:
del self._events[self._reads[reader]]
del self._reads[reader]
# If the descriptor is closed, move it out of the dictionary of
# reading descriptors into the dictionary of waiting descriptors.
if reader in self._closedAndReading:
self._closedAndNotReading[reader] = True
del self._closedAndReading[reader]
def removeWriter(self, writer):
"""Remove a Selectable for notification of data available to write."""
if writer in self._writes:
del self._writes[writer]
def removeAll(self):
"""
Remove all selectables, and return a list of them.
"""
return self._removeAll(self._reads, self._writes)
def getReaders(self):
return list(self._reads.keys())
def getWriters(self):
return list(self._writes.keys())
def doWaitForMultipleEvents(self, timeout):
log.msg(channel="system", event="iteration", reactor=self)
if timeout is None:
timeout = 100
# Keep track of whether we run any application code before we get to the
# MsgWaitForMultipleObjects. If so, there's a chance it will schedule a
# new timed call or stop the reactor or do something else that means we
# shouldn't block in MsgWaitForMultipleObjects for the full timeout.
ranUserCode = False
# If any descriptors are trying to close, try to get them out of the way
# first.
for reader in list(self._closedAndReading.keys()):
ranUserCode = True
self._runAction("doRead", reader)
for fd in list(self._writes.keys()):
ranUserCode = True
log.callWithLogger(fd, self._runWrite, fd)
if ranUserCode:
# If application code *might* have scheduled an event, assume it
# did. If we're wrong, we'll get back here shortly anyway. If
# we're right, we'll be sure to handle the event (including reactor
# shutdown) in a timely manner.
timeout = 0
if not (self._events or self._writes):
# sleep so we don't suck up CPU time
time.sleep(timeout)
return
handles = list(self._events.keys()) or [self.dummyEvent]
timeout = int(timeout * 1000)
val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT)
if val == WAIT_TIMEOUT:
return
elif val == WAIT_OBJECT_0 + len(handles):
exit = win32gui.PumpWaitingMessages()
if exit:
self.callLater(0, self.stop)
return
elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
event = handles[val - WAIT_OBJECT_0]
fd, action = self._events[event]
if fd in self._reads:
# Before anything, make sure it's still a valid file descriptor.
fileno = fd.fileno()
if fileno == -1:
self._disconnectSelectable(fd, posixbase._NO_FILEDESC, False)
return
# Since it's a socket (not another arbitrary event added via
# addEvent) and we asked for FD_READ | FD_CLOSE, check to see if
# we actually got FD_CLOSE. This needs a special check because
# it only gets delivered once. If we miss it, it's gone forever
# and we'll never know that the connection is closed.
events = WSAEnumNetworkEvents(fileno, event)
if FD_CLOSE in events:
self._closedAndReading[fd] = True
log.callWithLogger(fd, self._runAction, action, fd)
def _runWrite(self, fd):
closed = 0
try:
closed = fd.doWrite()
except BaseException:
closed = sys.exc_info()[1]
log.deferr()
if closed:
self.removeReader(fd)
self.removeWriter(fd)
try:
fd.connectionLost(failure.Failure(closed))
except BaseException:
log.deferr()
elif closed is None:
return 1
def _runAction(self, action, fd):
try:
closed = getattr(fd, action)()
except BaseException:
closed = sys.exc_info()[1]
log.deferr()
if closed:
self._disconnectSelectable(fd, closed, action == "doRead")
doIteration = doWaitForMultipleEvents
class _ThreadFDWrapper:
"""
This wraps an event handler and translates notification in the helper
L{Win32Reactor} thread into a notification in the primary reactor thread.
@ivar _reactor: The primary reactor, the one to which event notification
will be sent.
@ivar _fd: The L{FileDescriptor} to which the event will be dispatched.
@ivar _action: A C{str} giving the method of C{_fd} which handles the event.
@ivar _logPrefix: The pre-fetched log prefix string for C{_fd}, so that
C{_fd.logPrefix} does not need to be called in a non-main thread.
"""
def __init__(self, reactor, fd, action, logPrefix):
self._reactor = reactor
self._fd = fd
self._action = action
self._logPrefix = logPrefix
def logPrefix(self):
"""
Return the original handler's log prefix, as it was given to
C{__init__}.
"""
return self._logPrefix
def _execute(self):
"""
Callback fired when the associated event is set. Run the C{action}
callback on the wrapped descriptor in the main reactor thread and raise
or return whatever it raises or returns to cause this event handler to
be removed from C{self._reactor} if appropriate.
"""
return blockingCallFromThread(
self._reactor, lambda: getattr(self._fd, self._action)()
)
def connectionLost(self, reason):
"""
Pass through to the wrapped descriptor, but in the main reactor thread
instead of the helper C{Win32Reactor} thread.
"""
self._reactor.callFromThread(self._fd.connectionLost, reason)
@implementer(IReactorWin32Events)
class _ThreadedWin32EventsMixin:
"""
This mixin implements L{IReactorWin32Events} for another reactor by running
a L{Win32Reactor} in a separate thread and dispatching work to it.
@ivar _reactor: The L{Win32Reactor} running in the other thread. This is
L{None} until it is actually needed.
@ivar _reactorThread: The L{threading.Thread} which is running the
L{Win32Reactor}. This is L{None} until it is actually needed.
"""
_reactor = None
_reactorThread = None
def _unmakeHelperReactor(self):
"""
Stop and discard the reactor started by C{_makeHelperReactor}.
"""
self._reactor.callFromThread(self._reactor.stop)
self._reactor = None
def _makeHelperReactor(self):
"""
Create and (in a new thread) start a L{Win32Reactor} instance to use for
the implementation of L{IReactorWin32Events}.
"""
self._reactor = Win32Reactor()
# This is a helper reactor, it is not the global reactor and its thread
# is not "the" I/O thread. Prevent it from registering it as such.
self._reactor._registerAsIOThread = False
self._reactorThread = Thread(target=self._reactor.run, args=(False,))
self.addSystemEventTrigger("after", "shutdown", self._unmakeHelperReactor)
self._reactorThread.start()
def addEvent(self, event, fd, action):
"""
@see: L{IReactorWin32Events}
"""
if self._reactor is None:
self._makeHelperReactor()
self._reactor.callFromThread(
self._reactor.addEvent,
event,
_ThreadFDWrapper(self, fd, action, fd.logPrefix()),
"_execute",
)
def removeEvent(self, event):
"""
@see: L{IReactorWin32Events}
"""
self._reactor.callFromThread(self._reactor.removeEvent, event)
def install():
threadable.init(1)
r = Win32Reactor()
from . import main
main.installReactor(r)
__all__ = ["Win32Reactor", "install"]
|