diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/internet/kqreactor.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/internet/kqreactor.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/internet/kqreactor.py | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/internet/kqreactor.py b/contrib/python/Twisted/py2/twisted/internet/kqreactor.py new file mode 100644 index 0000000000..ffc40c385a --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/internet/kqreactor.py @@ -0,0 +1,320 @@ +# -*- test-case-name: twisted.test.test_kqueuereactor -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +A kqueue()/kevent() based implementation of the Twisted main loop. + +To use this reactor, start your application specifying the kqueue reactor:: + + twistd --reactor kqueue ... + +To install the event loop from code (and you should do this before any +connections, listeners or connectors are added):: + + from twisted.internet import kqreactor + kqreactor.install() +""" + +from __future__ import division, absolute_import + +import errno +import select + +from select import KQ_FILTER_READ, KQ_FILTER_WRITE +from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF + +from zope.interface import implementer, declarations, Interface, Attribute + +from twisted.internet import main, posixbase +from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize +from twisted.python import log, failure + + + +class _IKQueue(Interface): + """ + An interface for KQueue implementations. + """ + kqueue = Attribute("An implementation of kqueue(2).") + kevent = Attribute("An implementation of kevent(2).") + +declarations.directlyProvides(select, _IKQueue) + + + +@implementer(IReactorFDSet, IReactorDaemonize) +class KQueueReactor(posixbase.PosixReactorBase): + """ + A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher + which has built in support for kqueue in the select module. + + @ivar _kq: A C{kqueue} which will be used to check for I/O readiness. + + @ivar _impl: The implementation of L{_IKQueue} to use. + + @ivar _selectables: A dictionary mapping integer file descriptors to + instances of L{FileDescriptor} which have been registered with the + reactor. All L{FileDescriptor}s which are currently receiving read or + write readiness notifications will be present as values in this + dictionary. + + @ivar _reads: A set containing integer file descriptors. Values in this + set will be registered with C{_kq} for read readiness notifications + which will be dispatched to the corresponding L{FileDescriptor} + instances in C{_selectables}. + + @ivar _writes: A set containing integer file descriptors. Values in this + set will be registered with C{_kq} for write readiness notifications + which will be dispatched to the corresponding L{FileDescriptor} + instances in C{_selectables}. + """ + + def __init__(self, _kqueueImpl=select): + """ + Initialize kqueue object, file descriptor tracking dictionaries, and + the base class. + + See: + - http://docs.python.org/library/select.html + - www.freebsd.org/cgi/man.cgi?query=kqueue + - people.freebsd.org/~jlemon/papers/kqueue.pdf + + @param _kqueueImpl: The implementation of L{_IKQueue} to use. A + hook for testing. + """ + self._impl = _kqueueImpl + self._kq = self._impl.kqueue() + self._reads = set() + self._writes = set() + self._selectables = {} + posixbase.PosixReactorBase.__init__(self) + + + def _updateRegistration(self, fd, filter, op): + """ + Private method for changing kqueue registration on a given FD + filtering for events given filter/op. This will never block and + returns nothing. + """ + self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0) + + + def beforeDaemonize(self): + """ + Implement L{IReactorDaemonize.beforeDaemonize}. + """ + # Twisted-internal method called during daemonization (when application + # is started via twistd). This is called right before the magic double + # forking done for daemonization. We cleanly close the kqueue() and later + # recreate it. This is needed since a) kqueue() are not inherited across + # forks and b) twistd will create the reactor already before daemonization + # (and will also add at least 1 reader to the reactor, an instance of + # twisted.internet.posixbase._UnixWaker). + # + # See: twisted.scripts._twistd_unix.daemonize() + self._kq.close() + self._kq = None + + + def afterDaemonize(self): + """ + Implement L{IReactorDaemonize.afterDaemonize}. + """ + # Twisted-internal method called during daemonization. This is called right + # after daemonization and recreates the kqueue() and any readers/writers + # that were added before. Note that you MUST NOT call any reactor methods + # in between beforeDaemonize() and afterDaemonize()! + self._kq = self._impl.kqueue() + for fd in self._reads: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + for fd in self._writes: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + + + def addReader(self, reader): + """ + Implement L{IReactorFDSet.addReader}. + """ + fd = reader.fileno() + if fd not in self._reads: + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = reader + self._reads.add(fd) + + + def addWriter(self, writer): + """ + Implement L{IReactorFDSet.addWriter}. + """ + fd = writer.fileno() + if fd not in self._writes: + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = writer + self._writes.add(fd) + + + def removeReader(self, reader): + """ + Implement L{IReactorFDSet.removeReader}. + """ + wasLost = False + try: + fd = reader.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if reader is fdes: + wasLost = True + break + else: + return + if fd in self._reads: + self._reads.remove(fd) + if fd not in self._writes: + del self._selectables[fd] + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) + except OSError: + pass + + + def removeWriter(self, writer): + """ + Implement L{IReactorFDSet.removeWriter}. + """ + wasLost = False + try: + fd = writer.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if writer is fdes: + wasLost = True + break + else: + return + if fd in self._writes: + self._writes.remove(fd) + if fd not in self._reads: + del self._selectables[fd] + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) + except OSError: + pass + + + def removeAll(self): + """ + Implement L{IReactorFDSet.removeAll}. + """ + return self._removeAll( + [self._selectables[fd] for fd in self._reads], + [self._selectables[fd] for fd in self._writes]) + + + def getReaders(self): + """ + Implement L{IReactorFDSet.getReaders}. + """ + return [self._selectables[fd] for fd in self._reads] + + + def getWriters(self): + """ + Implement L{IReactorFDSet.getWriters}. + """ + return [self._selectables[fd] for fd in self._writes] + + + def doKEvent(self, timeout): + """ + Poll the kqueue for new events. + """ + if timeout is None: + timeout = 1 + + try: + events = self._kq.control([], len(self._selectables), timeout) + except OSError as e: + # Since this command blocks for potentially a while, it's possible + # EINTR can be raised for various reasons (for example, if the user + # hits ^C). + if e.errno == errno.EINTR: + return + else: + raise + + _drdw = self._doWriteOrRead + for event in events: + fd = event.ident + try: + selectable = self._selectables[fd] + except KeyError: + # Handles the infrequent case where one selectable's + # handler disconnects another. + continue + else: + log.callWithLogger(selectable, _drdw, selectable, fd, event) + + + def _doWriteOrRead(self, selectable, fd, event): + """ + Private method called when a FD is ready for reading, writing or was + lost. Do the work and raise errors where necessary. + """ + why = None + inRead = False + (filter, flags, data, fflags) = ( + event.filter, event.flags, event.data, event.fflags) + + if flags & KQ_EV_EOF and data and fflags: + why = main.CONNECTION_LOST + else: + try: + if selectable.fileno() == -1: + inRead = False + why = posixbase._NO_FILEDESC + else: + if filter == KQ_FILTER_READ: + inRead = True + why = selectable.doRead() + if filter == KQ_FILTER_WRITE: + inRead = False + why = selectable.doWrite() + except: + # Any exception from application code gets logged and will + # cause us to disconnect the selectable. + why = failure.Failure() + log.err(why, "An exception was raised from application code" \ + " while processing a reactor selectable") + + if why: + self._disconnectSelectable(selectable, why, inRead) + + doIteration = doKEvent + + + +def install(): + """ + Install the kqueue() reactor. + """ + p = KQueueReactor() + from twisted.internet.main import installReactor + installReactor(p) + + +__all__ = ["KQueueReactor", "install"] |