diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-25 12:54:32 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-25 13:03:33 +0300 |
commit | 4a64a813e1d34e732f35d8a65147974f76395a6f (patch) | |
tree | a8da0dede5213f85e45b95047cfbdcf5427cf0b7 /contrib/python/Twisted/py3/twisted/internet/_threadedselect.py | |
parent | e9bbee265681b79a9ef9795bdc84cf6996f9cfec (diff) | |
download | ydb-4a64a813e1d34e732f35d8a65147974f76395a6f.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/_threadedselect.py')
-rw-r--r-- | contrib/python/Twisted/py3/twisted/internet/_threadedselect.py | 465 |
1 files changed, 235 insertions, 230 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py b/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py index 8a53e4ca962..1c7db16b0a6 100644 --- a/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py +++ b/contrib/python/Twisted/py3/twisted/internet/_threadedselect.py @@ -10,14 +10,14 @@ arbitrary foreign event loop, such as those you find in GUI toolkits. There are three things you'll need to do to use this reactor. -Install the reactor at the beginning of your program, before importing -the rest of Twisted:: +Install the reactor at the beginning of your program, before importing the rest +of Twisted:: | from twisted.internet import _threadedselect | _threadedselect.install() -Interleave this reactor with your foreign event loop, at some point after -your event loop is initialized:: +Interleave this reactor with your foreign event loop, at some point after your +event loop is initialized:: | from twisted.internet import reactor | reactor.interleave(foreignEventLoopWakerFunction) @@ -31,68 +31,113 @@ reactor:: In order for Twisted to do its work in the main thread (the thread that interleave is called from), a waker function is necessary. The waker function -will be called from a "background" thread with one argument: func. -The waker function's purpose is to call func() from the main thread. -Many GUI toolkits ship with appropriate waker functions. -Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in -older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter. -These would be used in place of "foreignEventLoopWakerFunction" in the above -example. +will be called from a "background" thread with one argument: func. The waker +function's purpose is to call func() from the main thread. Many GUI toolkits +ship with appropriate waker functions. One example of this is wxPython's +wx.callAfter (may be wxCallAfter in older versions of wxPython). These would +be used in place of "foreignEventLoopWakerFunction" in the above example. The other integration point at which the foreign event loop and this reactor -must integrate is shutdown. In order to ensure clean shutdown of Twisted, -you must allow for Twisted to come to a complete stop before quitting the +must integrate is shutdown. In order to ensure clean shutdown of Twisted, you +must allow for Twisted to come to a complete stop before quitting the application. Typically, you will do this by setting up an after shutdown trigger to stop your foreign event loop, and call reactor.stop() where you would normally have initiated the shutdown procedure for the foreign event -loop. Shutdown functions that could be used in place of -"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance -with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function. +loop. Shutdown functions that could be used in place of "foreignEventloopStop" +would be the ExitMainLoop method of the wxApp instance with wxPython. """ +from __future__ import annotations -import select -import sys from errno import EBADF, EINTR -from functools import partial from queue import Empty, Queue from threading import Thread +from typing import Any, Callable from zope.interface import implementer +from twisted._threads import ThreadWorker from twisted.internet import posixbase -from twisted.internet.interfaces import IReactorFDSet -from twisted.internet.posixbase import _NO_FILEDESC, _NO_FILENO -from twisted.internet.selectreactor import _select -from twisted.python import failure, log, threadable +from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor +from twisted.internet.selectreactor import _preenDescriptors, _select +from twisted.logger import Logger +from twisted.python.log import callWithLogger as _callWithLogger - -def dictRemove(dct, value): - try: - del dct[value] - except KeyError: - pass +_log = Logger() def raiseException(e): raise e +def _threadsafeSelect( + timeout: float | None, + readmap: dict[int, IReadDescriptor], + writemap: dict[int, IWriteDescriptor], + handleResult: Callable[ + [ + list[int], + list[int], + dict[int, IReadDescriptor], + dict[int, IWriteDescriptor], + bool, + ], + None, + ], +) -> None: + """ + Invoke C{select}. This will be called in a non-main thread, so it is very + careful to work only on integers and avoid calling any application code. + """ + preen = False + r = [] + w = [] + while 1: + readints = readmap.keys() + writeints = writemap.keys() + try: + result = _select(readints, writeints, [], timeout) + except ValueError: + # Possible problems with file descriptors that were passed: + # ValueError may indicate that a file descriptor has gone negative. + preen = True + break + except OSError as se: + # The select() system call encountered an error. + if se.args[0] == EINTR: + # EINTR is hard to replicate in tests using an actual select(), + # and I don't want to dedicate effort to testing this function + # when it needs to be refactored with selectreactor. + + return # pragma: no cover + elif se.args[0] == EBADF: + preen = True + break + else: + # OK, I really don't know what's going on. Blow up. Never + # mind with the coverage here, since we are just trying to make + # sure we don't swallow an exception. + raise # pragma: no cover + else: + r, w, ignored = result + break + handleResult(r, w, readmap, writemap, preen) + + @implementer(IReactorFDSet) class ThreadedSelectReactor(posixbase.PosixReactorBase): """A threaded select() based reactor - runs on all POSIX platforms and on Win32. """ - def __init__(self): - threadable.init(1) - self.reads = {} - self.writes = {} - self.toThreadQueue = Queue() - self.toMainThread = Queue() - self.workerThread = None - self.mainWaker = None + def __init__( + self, waker: Callable[[Callable[[], None]], None] | None = None + ) -> None: + self.reads: set[IReadDescriptor] = set() + self.writes: set[IWriteDescriptor] = set() posixbase.PosixReactorBase.__init__(self) - self.addSystemEventTrigger("after", "shutdown", self._mainLoopShutdown) + self._selectorThread: ThreadWorker | None = None + self.mainWaker = waker + self._iterationQueue: Queue[Callable[[], None]] | None = None def wakeUp(self): # we want to wake up from any thread @@ -103,205 +148,131 @@ class ThreadedSelectReactor(posixbase.PosixReactorBase): self.wakeUp() return tple - def _sendToMain(self, msg, *args): - self.toMainThread.put((msg, args)) - if self.mainWaker is not None: - self.mainWaker() - - def _sendToThread(self, fn, *args): - self.toThreadQueue.put((fn, args)) - - def _preenDescriptorsInThread(self): - log.msg("Malformed file descriptor found. Preening lists.") - readers = self.reads.keys() - writers = self.writes.keys() - self.reads.clear() - self.writes.clear() - for selDict, selList in ((self.reads, readers), (self.writes, writers)): - for selectable in selList: - try: - select.select([selectable], [selectable], [selectable], 0) - except BaseException: - log.msg("bad descriptor %s" % selectable) - else: - selDict[selectable] = 1 + def _doReadOrWrite(self, selectable: object, method: str) -> None: + with _log.failuresHandled( + "while handling selectable {sel}", sel=selectable + ) as op: + why = getattr(selectable, method)() + if (fail := op.failure) is not None: + why = fail.value + if why: + self._disconnectSelectable(selectable, why, method == "doRead") - def _workerInThread(self): - try: - while 1: - fn, args = self.toThreadQueue.get() - fn(*args) - except SystemExit: - pass # Exception indicates this thread should exit - except BaseException: - f = failure.Failure() - self._sendToMain("Failure", f) - - def _doSelectInThread(self, timeout): - """Run one iteration of the I/O monitor loop. - - This will run all selectables who had input or output readiness - waiting for them. - """ - reads = self.reads - writes = self.writes - while 1: - try: - r, w, ignored = _select(reads.keys(), writes.keys(), [], timeout) - break - except ValueError: - # Possibly a file descriptor has gone negative? - log.err() - self._preenDescriptorsInThread() - except TypeError: - # Something *totally* invalid (object w/o fileno, non-integral - # result) was passed - log.err() - self._preenDescriptorsInThread() - except OSError as se: - # select(2) encountered an error - if se.args[0] in (0, 2): - # windows does this if it got an empty list - if (not reads) and (not writes): - return - else: - raise - elif se.args[0] == EINTR: + def _selectOnce(self, timeout: float | None, keepGoing: bool) -> None: + reads: dict[int, Any] = {} + writes: dict[int, Any] = {} + for isRead, fdmap, d in [ + (True, self.reads, reads), + (False, self.writes, writes), + ]: + for each in fdmap: # type:ignore[attr-defined] + d[each.fileno()] = each + + mainWaker = self.mainWaker + assert mainWaker is not None, ( + "neither .interleave() nor .mainLoop() / .run() called, " + "but we are somehow running the reactor" + ) + + def callReadsAndWrites( + r: list[int], + w: list[int], + readmap: dict[int, IReadDescriptor], + writemap: dict[int, IWriteDescriptor], + preen: bool, + ) -> None: + @mainWaker + def onMainThread() -> None: + if preen: + _preenDescriptors( + self.reads, self.writes, self._disconnectSelectable + ) return - elif se.args[0] == EBADF: - self._preenDescriptorsInThread() + _drdw = self._doReadOrWrite + + for readable in r: + rselectable = readmap[readable] + if rselectable in self.reads: + _callWithLogger(rselectable, _drdw, rselectable, "doRead") + + for writable in w: + wselectable = writemap[writable] + if wselectable in self.writes: + _callWithLogger(wselectable, _drdw, wselectable, "doWrite") + + self.runUntilCurrent() + if self._started and keepGoing: + # see coverage note in .interleave() + self._selectOnce(self.timeout(), True) # pragma: no cover else: - # OK, I really don't know what's going on. Blow up. - raise - self._sendToMain("Notify", r, w) - - def _process_Notify(self, r, w): - reads = self.reads - writes = self.writes - - _drdw = self._doReadOrWrite - _logrun = log.callWithLogger - for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)): - for selectable in selectables: - # if this was disconnected in another thread, kill it. - if selectable not in dct: - continue - # This for pausing input when we're not ready for more. - _logrun(selectable, _drdw, selectable, method, dct) - - def _process_Failure(self, f): - f.raiseException() - - _doIterationInThread = _doSelectInThread - - def ensureWorkerThread(self): - if self.workerThread is None or not self.workerThread.isAlive(): - self.workerThread = Thread(target=self._workerInThread) - self.workerThread.start() - - def doThreadIteration(self, timeout): - self._sendToThread(self._doIterationInThread, timeout) - self.ensureWorkerThread() - msg, args = self.toMainThread.get() - getattr(self, "_process_" + msg)(*args) - - doIteration = doThreadIteration - - def _interleave(self): - while self.running: - self.runUntilCurrent() - t2 = self.timeout() - t = self.running and t2 - self._sendToThread(self._doIterationInThread, t) - yield None - msg, args = self.toMainThread.get_nowait() - getattr(self, "_process_" + msg)(*args) - - def interleave(self, waker, *args, **kw): + self._cleanUpThread() + + if self._selectorThread is None: + self._selectorThread = ThreadWorker( + lambda target: Thread(target=target).start(), Queue() + ) + self._selectorThread.do( + lambda: _threadsafeSelect(timeout, reads, writes, callReadsAndWrites) + ) + + def _cleanUpThread(self) -> None: + """ + Ensure that the selector thread is stopped. """ - interleave(waker) interleaves this reactor with the - current application by moving the blocking parts of - the reactor (select() in this case) to a separate - thread. This is typically useful for integration with - GUI applications which have their own event loop - already running. + oldThread, self._selectorThread = self._selectorThread, None + if oldThread is not None: + oldThread.quit() + + def interleave( + self, + waker: Callable[[Callable[[], None]], None], + installSignalHandlers: bool = True, + ) -> None: + """ + interleave(waker) interleaves this reactor with the current application + by moving the blocking parts of the reactor (select() in this case) to + a separate thread. This is typically useful for integration with GUI + applications which have their own event loop already running. See the module docstring for more information. """ - self.startRunning(*args, **kw) - loop = self._interleave() - - def mainWaker(waker=waker, loop=loop): - waker(partial(next, loop)) - - self.mainWaker = mainWaker - next(loop) - self.ensureWorkerThread() - - def _mainLoopShutdown(self): - self.mainWaker = None - if self.workerThread is not None: - self._sendToThread(raiseException, SystemExit) - self.wakeUp() - try: - while 1: - msg, args = self.toMainThread.get_nowait() - except Empty: - pass - self.workerThread.join() - self.workerThread = None - try: - while 1: - fn, args = self.toThreadQueue.get_nowait() - if fn is self._doIterationInThread: - log.msg("Iteration is still in the thread queue!") - elif fn is raiseException and args[0] is SystemExit: - pass - else: - fn(*args) - except Empty: - pass - - def _doReadOrWrite(self, selectable, method, dict): - try: - why = getattr(selectable, method)() - handfn = getattr(selectable, "fileno", None) - if not handfn: - why = _NO_FILENO - elif handfn() == -1: - why = _NO_FILEDESC - except BaseException: - why = sys.exc_info()[1] - log.err() - if why: - self._disconnectSelectable(selectable, why, method == "doRead") - - def addReader(self, reader): + # TODO: This method is excluded from coverage because it only happens + # in the case where we are actually running on a foreign event loop, + # and twisted's test suite isn't set up that way. It would be nice to + # add some dedicated tests for ThreadedSelectReactor that covered this + # case. + self.mainWaker = waker # pragma: no cover + self.startRunning(installSignalHandlers) # pragma: no cover + self._selectOnce(0.0, True) # pragma: no cover + + def addReader(self, reader: IReadDescriptor) -> None: """Add a FileDescriptor for notification of data available to read.""" - self._sendToThread(self.reads.__setitem__, reader, 1) + self.reads.add(reader) self.wakeUp() - def addWriter(self, writer): + def addWriter(self, writer: IWriteDescriptor) -> None: """Add a FileDescriptor for notification of data available to write.""" - self._sendToThread(self.writes.__setitem__, writer, 1) + self.writes.add(writer) self.wakeUp() - def removeReader(self, reader): + def removeReader(self, reader: IReadDescriptor) -> None: """Remove a Selectable for notification of data available to read.""" - self._sendToThread(dictRemove, self.reads, reader) + if reader in self.reads: + self.reads.remove(reader) - def removeWriter(self, writer): + def removeWriter(self, writer: IWriteDescriptor) -> None: """Remove a Selectable for notification of data available to write.""" - self._sendToThread(dictRemove, self.writes, writer) + if writer in self.writes: + self.writes.remove(writer) - def removeAll(self): - return self._removeAll(self.reads, self.writes) + def removeAll(self) -> list[IReadDescriptor | IWriteDescriptor]: + return self._removeAll(self.reads, self.writes) # type:ignore[no-any-return] - def getReaders(self): - return list(self.reads.keys()) + def getReaders(self) -> list[IReadDescriptor]: + return list(self.reads) - def getWriters(self): - return list(self.writes.keys()) + def getWriters(self) -> list[IWriteDescriptor]: + return list(self.writes) def stop(self): """ @@ -311,18 +282,52 @@ class ThreadedSelectReactor(posixbase.PosixReactorBase): posixbase.PosixReactorBase.stop(self) self.wakeUp() - def run(self, installSignalHandlers=True): - self.startRunning(installSignalHandlers=installSignalHandlers) - self.mainLoop() - - def mainLoop(self): - q = Queue() - self.interleave(q.put) - while self.running: - try: - q.get()() - except StopIteration: - break + def crash(self): + posixbase.PosixReactorBase.crash(self) + self.wakeUp() + + # The following methods are mostly for test-suite support, to make + # ThreadedSelectReactor behave like another reactor you might call run() + # on. + def _testMainLoopSetup(self) -> None: + """ + Mostly for compliance with L{IReactorCore} and usability with the + tests, set up a fake blocking main-loop; make the "foreign" main loop + we are interfacing with be C{self.mainLoop()}, that is reading from a + basic Queue. + """ + self._iterationQueue = Queue() + self.mainWaker = self._iterationQueue.put + + def _uninstallHandler(self) -> None: + """ + Handle uninstallation to ensure that cleanup is properly performed by + ReactorBuilder tests. + """ + super()._uninstallHandler() + self._cleanUpThread() + + def iterate(self, timeout: float = 0.0) -> None: + if self._iterationQueue is None and self.mainWaker is None: # pragma: no branch + self._testMainLoopSetup() + self.wakeUp() + super().iterate(timeout) + + def doIteration(self, timeout: float | None) -> None: + assert self._iterationQueue is not None + self._selectOnce(timeout, False) + try: + work = self._iterationQueue.get(timeout=timeout) + except Empty: + return + work() + + def mainLoop(self) -> None: + """ + This should not normally be run. + """ + self._testMainLoopSetup() + super().mainLoop() def install(): |