aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py429
1 files changed, 429 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py b/contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py
new file mode 100644
index 0000000000..e26c230797
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/internet/win32eventreactor.py
@@ -0,0 +1,429 @@
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+A win32event based implementation of the Twisted main loop.
+
+This requires pywin32 (formerly win32all) or ActivePython to be installed.
+
+To install the event loop (and you should do this before any connections,
+listeners or connectors are added)::
+
+ from twisted.internet import win32eventreactor
+ win32eventreactor.install()
+
+LIMITATIONS:
+ 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
+ 2. Process running has some problems (see L{twisted.internet.process} docstring).
+
+
+TODO:
+ 1. Event loop handling of writes is *very* problematic (this is causing failed tests).
+ Switch to doing it the correct way, whatever that means (see below).
+ 2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
+ 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
+
+
+ALTERNATIVE SOLUTIONS:
+ - IIRC, sockets can only be registered once. So we switch to a structure
+ like the poll() reactor, thus allowing us to deal with write events in
+ a decent fashion. This should allow us to pass tests, but we're still
+ limited to 64 events.
+
+Or:
+
+ - Instead of doing a reactor, we make this an addon to the select reactor.
+ The WFMO event loop runs in a separate thread. This means no need to maintain
+ separate code for networking, 64 event limit doesn't apply to sockets,
+ we can run processes and other win32 stuff in default event loop. The
+ only problem is that we're stuck with the icky socket based waker.
+ Another benefit is that this could be extended to support >64 events
+ in a simpler manner than the previous solution.
+
+The 2nd solution is probably what will get implemented.
+"""
+
+# System imports
+import time
+import sys
+from threading import Thread
+from weakref import WeakKeyDictionary
+
+from zope.interface import implementer
+
+# Win32 imports
+from win32file import FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT, WSAEventSelect
+try:
+ # WSAEnumNetworkEvents was added in pywin32 215
+ from win32file import WSAEnumNetworkEvents
+except ImportError:
+ import warnings
+ warnings.warn(
+ 'Reliable disconnection notification requires pywin32 215 or later',
+ category=UserWarning)
+ def WSAEnumNetworkEvents(fd, event):
+ return set([FD_READ])
+
+from win32event import CreateEvent, MsgWaitForMultipleObjects
+from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT
+
+import win32gui
+
+# Twisted imports
+from twisted.internet import posixbase
+from twisted.python import log, threadable, failure
+from twisted.internet.interfaces import IReactorFDSet
+from twisted.internet.interfaces import IReactorWin32Events
+from twisted.internet.threads import blockingCallFromThread
+
+
+@implementer(IReactorFDSet, IReactorWin32Events)
+class Win32Reactor(posixbase.PosixReactorBase):
+ """
+ Reactor that uses Win32 event APIs.
+
+ @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
+ win32 event object used to check for read events for that descriptor.
+
+ @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
+ arbitrary value. Keys in this dictionary will be given a chance to
+ write out their data.
+
+ @ivar _events: A dictionary mapping win32 event object to tuples of
+ L{FileDescriptor} instances and event masks.
+
+ @ivar _closedAndReading: Along with C{_closedAndNotReading}, keeps track of
+ descriptors which have had close notification delivered from the OS but
+ which we have not finished reading data from. MsgWaitForMultipleObjects
+ will only deliver close notification to us once, so we remember it in
+ these two dictionaries until we're ready to act on it. The OS has
+ delivered close notification for each descriptor in this dictionary, and
+ the descriptors are marked as allowed to handle read events in the
+ reactor, so they can be processed. When a descriptor is marked as not
+ allowed to handle read events in the reactor (ie, it is passed to
+ L{IReactorFDSet.removeReader}), it is moved out of this dictionary and
+ into C{_closedAndNotReading}. The descriptors are keys in this
+ dictionary. The values are arbitrary.
+ @type _closedAndReading: C{dict}
+
+ @ivar _closedAndNotReading: These descriptors have had close notification
+ delivered from the OS, but are not marked as allowed to handle read
+ events in the reactor. They are saved here to record their closed
+ state, but not processed at all. When one of these descriptors is
+ passed to L{IReactorFDSet.addReader}, it is moved out of this dictionary
+ and into C{_closedAndReading}. The descriptors are keys in this
+ dictionary. The values are arbitrary. This is a weak key dictionary so
+ that if an application tells the reactor to stop reading from a
+ descriptor and then forgets about that descriptor itself, the reactor
+ will also forget about it.
+ @type _closedAndNotReading: C{WeakKeyDictionary}
+ """
+ dummyEvent = CreateEvent(None, 0, 0, None)
+
+ def __init__(self):
+ self._reads = {}
+ self._writes = {}
+ self._events = {}
+ self._closedAndReading = {}
+ self._closedAndNotReading = WeakKeyDictionary()
+ posixbase.PosixReactorBase.__init__(self)
+
+
+ def _makeSocketEvent(self, fd, action, why):
+ """
+ Make a win32 event object for a socket.
+ """
+ event = CreateEvent(None, 0, 0, None)
+ WSAEventSelect(fd, event, why)
+ self._events[event] = (fd, action)
+ return event
+
+
+ def addEvent(self, event, fd, action):
+ """
+ Add a new win32 event to the event loop.
+ """
+ self._events[event] = (fd, action)
+
+
+ def removeEvent(self, event):
+ """
+ Remove an event.
+ """
+ del self._events[event]
+
+
+ def addReader(self, reader):
+ """
+ Add a socket FileDescriptor for notification of data available to read.
+ """
+ if reader not in self._reads:
+ self._reads[reader] = self._makeSocketEvent(
+ reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE)
+ # If the reader is closed, move it over to the dictionary of reading
+ # descriptors.
+ if reader in self._closedAndNotReading:
+ self._closedAndReading[reader] = True
+ del self._closedAndNotReading[reader]
+
+
+ def addWriter(self, writer):
+ """
+ Add a socket FileDescriptor for notification of data available to write.
+ """
+ if writer not in self._writes:
+ self._writes[writer] = 1
+
+
+ def removeReader(self, reader):
+ """Remove a Selectable for notification of data available to read.
+ """
+ if reader in self._reads:
+ del self._events[self._reads[reader]]
+ del self._reads[reader]
+
+ # If the descriptor is closed, move it out of the dictionary of
+ # reading descriptors into the dictionary of waiting descriptors.
+ if reader in self._closedAndReading:
+ self._closedAndNotReading[reader] = True
+ del self._closedAndReading[reader]
+
+
+ def removeWriter(self, writer):
+ """Remove a Selectable for notification of data available to write.
+ """
+ if writer in self._writes:
+ del self._writes[writer]
+
+
+ def removeAll(self):
+ """
+ Remove all selectables, and return a list of them.
+ """
+ return self._removeAll(self._reads, self._writes)
+
+
+ def getReaders(self):
+ return list(self._reads.keys())
+
+
+ def getWriters(self):
+ return list(self._writes.keys())
+
+
+ def doWaitForMultipleEvents(self, timeout):
+ log.msg(channel='system', event='iteration', reactor=self)
+ if timeout is None:
+ timeout = 100
+
+ # Keep track of whether we run any application code before we get to the
+ # MsgWaitForMultipleObjects. If so, there's a chance it will schedule a
+ # new timed call or stop the reactor or do something else that means we
+ # shouldn't block in MsgWaitForMultipleObjects for the full timeout.
+ ranUserCode = False
+
+ # If any descriptors are trying to close, try to get them out of the way
+ # first.
+ for reader in list(self._closedAndReading.keys()):
+ ranUserCode = True
+ self._runAction('doRead', reader)
+
+ for fd in list(self._writes.keys()):
+ ranUserCode = True
+ log.callWithLogger(fd, self._runWrite, fd)
+
+ if ranUserCode:
+ # If application code *might* have scheduled an event, assume it
+ # did. If we're wrong, we'll get back here shortly anyway. If
+ # we're right, we'll be sure to handle the event (including reactor
+ # shutdown) in a timely manner.
+ timeout = 0
+
+ if not (self._events or self._writes):
+ # sleep so we don't suck up CPU time
+ time.sleep(timeout)
+ return
+
+ handles = list(self._events.keys()) or [self.dummyEvent]
+ timeout = int(timeout * 1000)
+ val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT)
+ if val == WAIT_TIMEOUT:
+ return
+ elif val == WAIT_OBJECT_0 + len(handles):
+ exit = win32gui.PumpWaitingMessages()
+ if exit:
+ self.callLater(0, self.stop)
+ return
+ elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
+ event = handles[val - WAIT_OBJECT_0]
+ fd, action = self._events[event]
+
+ if fd in self._reads:
+ # Before anything, make sure it's still a valid file descriptor.
+ fileno = fd.fileno()
+ if fileno == -1:
+ self._disconnectSelectable(fd, posixbase._NO_FILEDESC, False)
+ return
+
+ # Since it's a socket (not another arbitrary event added via
+ # addEvent) and we asked for FD_READ | FD_CLOSE, check to see if
+ # we actually got FD_CLOSE. This needs a special check because
+ # it only gets delivered once. If we miss it, it's gone forever
+ # and we'll never know that the connection is closed.
+ events = WSAEnumNetworkEvents(fileno, event)
+ if FD_CLOSE in events:
+ self._closedAndReading[fd] = True
+ log.callWithLogger(fd, self._runAction, action, fd)
+
+
+ def _runWrite(self, fd):
+ closed = 0
+ try:
+ closed = fd.doWrite()
+ except:
+ closed = sys.exc_info()[1]
+ log.deferr()
+
+ if closed:
+ self.removeReader(fd)
+ self.removeWriter(fd)
+ try:
+ fd.connectionLost(failure.Failure(closed))
+ except:
+ log.deferr()
+ elif closed is None:
+ return 1
+
+ def _runAction(self, action, fd):
+ try:
+ closed = getattr(fd, action)()
+ except:
+ closed = sys.exc_info()[1]
+ log.deferr()
+ if closed:
+ self._disconnectSelectable(fd, closed, action == 'doRead')
+
+ doIteration = doWaitForMultipleEvents
+
+
+
+class _ThreadFDWrapper(object):
+ """
+ This wraps an event handler and translates notification in the helper
+ L{Win32Reactor} thread into a notification in the primary reactor thread.
+
+ @ivar _reactor: The primary reactor, the one to which event notification
+ will be sent.
+
+ @ivar _fd: The L{FileDescriptor} to which the event will be dispatched.
+
+ @ivar _action: A C{str} giving the method of C{_fd} which handles the event.
+
+ @ivar _logPrefix: The pre-fetched log prefix string for C{_fd}, so that
+ C{_fd.logPrefix} does not need to be called in a non-main thread.
+ """
+ def __init__(self, reactor, fd, action, logPrefix):
+ self._reactor = reactor
+ self._fd = fd
+ self._action = action
+ self._logPrefix = logPrefix
+
+
+ def logPrefix(self):
+ """
+ Return the original handler's log prefix, as it was given to
+ C{__init__}.
+ """
+ return self._logPrefix
+
+
+ def _execute(self):
+ """
+ Callback fired when the associated event is set. Run the C{action}
+ callback on the wrapped descriptor in the main reactor thread and raise
+ or return whatever it raises or returns to cause this event handler to
+ be removed from C{self._reactor} if appropriate.
+ """
+ return blockingCallFromThread(
+ self._reactor, lambda: getattr(self._fd, self._action)())
+
+
+ def connectionLost(self, reason):
+ """
+ Pass through to the wrapped descriptor, but in the main reactor thread
+ instead of the helper C{Win32Reactor} thread.
+ """
+ self._reactor.callFromThread(self._fd.connectionLost, reason)
+
+
+
+@implementer(IReactorWin32Events)
+class _ThreadedWin32EventsMixin(object):
+ """
+ This mixin implements L{IReactorWin32Events} for another reactor by running
+ a L{Win32Reactor} in a separate thread and dispatching work to it.
+
+ @ivar _reactor: The L{Win32Reactor} running in the other thread. This is
+ L{None} until it is actually needed.
+
+ @ivar _reactorThread: The L{threading.Thread} which is running the
+ L{Win32Reactor}. This is L{None} until it is actually needed.
+ """
+
+ _reactor = None
+ _reactorThread = None
+
+
+ def _unmakeHelperReactor(self):
+ """
+ Stop and discard the reactor started by C{_makeHelperReactor}.
+ """
+ self._reactor.callFromThread(self._reactor.stop)
+ self._reactor = None
+
+
+ def _makeHelperReactor(self):
+ """
+ Create and (in a new thread) start a L{Win32Reactor} instance to use for
+ the implementation of L{IReactorWin32Events}.
+ """
+ self._reactor = Win32Reactor()
+ # This is a helper reactor, it is not the global reactor and its thread
+ # is not "the" I/O thread. Prevent it from registering it as such.
+ self._reactor._registerAsIOThread = False
+ self._reactorThread = Thread(
+ target=self._reactor.run, args=(False,))
+ self.addSystemEventTrigger(
+ 'after', 'shutdown', self._unmakeHelperReactor)
+ self._reactorThread.start()
+
+
+ def addEvent(self, event, fd, action):
+ """
+ @see: L{IReactorWin32Events}
+ """
+ if self._reactor is None:
+ self._makeHelperReactor()
+ self._reactor.callFromThread(
+ self._reactor.addEvent,
+ event, _ThreadFDWrapper(self, fd, action, fd.logPrefix()),
+ "_execute")
+
+
+ def removeEvent(self, event):
+ """
+ @see: L{IReactorWin32Events}
+ """
+ self._reactor.callFromThread(self._reactor.removeEvent, event)
+
+
+
+def install():
+ threadable.init(1)
+ r = Win32Reactor()
+ from . import main
+ main.installReactor(r)
+
+
+__all__ = ["Win32Reactor", "install"]