# -*- test-case-name: twisted.test.test_kqueuereactor -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
A kqueue()/kevent() based implementation of the Twisted main loop.
To use this reactor, start your application specifying the kqueue reactor::
twistd --reactor kqueue ...
To install the event loop from code (and you should do this before any
connections, listeners or connectors are added)::
from twisted.internet import kqreactor
kqreactor.install()
"""
from __future__ import division, absolute_import
import errno
import select
from select import KQ_FILTER_READ, KQ_FILTER_WRITE
from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
from zope.interface import implementer, declarations, Interface, Attribute
from twisted.internet import main, posixbase
from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize
from twisted.python import log, failure
class _IKQueue(Interface):
"""
An interface for KQueue implementations.
"""
kqueue = Attribute("An implementation of kqueue(2).")
kevent = Attribute("An implementation of kevent(2).")
declarations.directlyProvides(select, _IKQueue)
@implementer(IReactorFDSet, IReactorDaemonize)
class KQueueReactor(posixbase.PosixReactorBase):
"""
A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
which has built in support for kqueue in the select module.
@ivar _kq: A C{kqueue} which will be used to check for I/O readiness.
@ivar _impl: The implementation of L{_IKQueue} to use.
@ivar _selectables: A dictionary mapping integer file descriptors to
instances of L{FileDescriptor} which have been registered with the
reactor. All L{FileDescriptor}s which are currently receiving read or
write readiness notifications will be present as values in this
dictionary.
@ivar _reads: A set containing integer file descriptors. Values in this
set will be registered with C{_kq} for read readiness notifications
which will be dispatched to the corresponding L{FileDescriptor}
instances in C{_selectables}.
@ivar _writes: A set containing integer file descriptors. Values in this
set will be registered with C{_kq} for write readiness notifications
which will be dispatched to the corresponding L{FileDescriptor}
instances in C{_selectables}.
"""
def __init__(self, _kqueueImpl=select):
"""
Initialize kqueue object, file descriptor tracking dictionaries, and
the base class.
See:
- http://docs.python.org/library/select.html
- www.freebsd.org/cgi/man.cgi?query=kqueue
- people.freebsd.org/~jlemon/papers/kqueue.pdf
@param _kqueueImpl: The implementation of L{_IKQueue} to use. A
hook for testing.
"""
self._impl = _kqueueImpl
self._kq = self._impl.kqueue()
self._reads = set()
self._writes = set()
self._selectables = {}
posixbase.PosixReactorBase.__init__(self)
def _updateRegistration(self, fd, filter, op):
"""
Private method for changing kqueue registration on a given FD
filtering for events given filter/op. This will never block and
returns nothing.
"""
self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0)
def beforeDaemonize(self):
"""
Implement L{IReactorDaemonize.beforeDaemonize}.
"""
# Twisted-internal method called during daemonization (when application
# is started via twistd). This is called right before the magic double
# forking done for daemonization. We cleanly close the kqueue() and later
# recreate it. This is needed since a) kqueue() are not inherited across
# forks and b) twistd will create the reactor already before daemonization
# (and will also add at least 1 reader to the reactor, an instance of
# twisted.internet.posixbase._UnixWaker).
#
# See: twisted.scripts._twistd_unix.daemonize()
self._kq.close()
self._kq = None
def afterDaemonize(self):
"""
Implement L{IReactorDaemonize.afterDaemonize}.
"""
# Twisted-internal method called during daemonization. This is called right
# after daemonization and recreates the kqueue() and any readers/writers
# that were added before. Note that you MUST NOT call any reactor methods
# in between beforeDaemonize() and afterDaemonize()!
self._kq = self._impl.kqueue()
for fd in self._reads:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
for fd in self._writes:
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
def addReader(self, reader):
"""
Implement L{IReactorFDSet.addReader}.
"""
fd = reader.fileno()
if fd not in self._reads:
try:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
except OSError:
pass
finally:
self._selectables[fd] = reader
self._reads.add(fd)
def addWriter(self, writer):
"""
Implement L{IReactorFDSet.addWriter}.
"""
fd = writer.fileno()
if fd not in self._writes:
try:
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
except OSError:
pass
finally:
self._selectables[fd] = writer
self._writes.add(fd)
def removeReader(self, reader):
"""
Implement L{IReactorFDSet.removeReader}.
"""
wasLost = False
try:
fd = reader.fileno()
except:
fd = -1
if fd == -1:
for fd, fdes in self._selectables.items():
if reader is fdes:
wasLost = True
break
else:
return
if fd in self._reads:
self._reads.remove(fd)
if fd not in self._writes:
del self._selectables[fd]
if not wasLost:
try:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
except OSError:
pass
def removeWriter(self, writer):
"""
Implement L{IReactorFDSet.removeWriter}.
"""
wasLost = False
try:
fd = writer.fileno()
except:
fd = -1
if fd == -1:
for fd, fdes in self._selectables.items():
if writer is fdes:
wasLost = True
break
else:
return
if fd in self._writes:
self._writes.remove(fd)
if fd not in self._reads:
del self._selectables[fd]
if not wasLost:
try:
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
except OSError:
pass
def removeAll(self):
"""
Implement L{IReactorFDSet.removeAll}.
"""
return self._removeAll(
[self._selectables[fd] for fd in self._reads],
[self._selectables[fd] for fd in self._writes])
def getReaders(self):
"""
Implement L{IReactorFDSet.getReaders}.
"""
return [self._selectables[fd] for fd in self._reads]
def getWriters(self):
"""
Implement L{IReactorFDSet.getWriters}.
"""
return [self._selectables[fd] for fd in self._writes]
def doKEvent(self, timeout):
"""
Poll the kqueue for new events.
"""
if timeout is None:
timeout = 1
try:
events = self._kq.control([], len(self._selectables), timeout)
except OSError as e:
# Since this command blocks for potentially a while, it's possible
# EINTR can be raised for various reasons (for example, if the user
# hits ^C).
if e.errno == errno.EINTR:
return
else:
raise
_drdw = self._doWriteOrRead
for event in events:
fd = event.ident
try:
selectable = self._selectables[fd]
except KeyError:
# Handles the infrequent case where one selectable's
# handler disconnects another.
continue
else:
log.callWithLogger(selectable, _drdw, selectable, fd, event)
def _doWriteOrRead(self, selectable, fd, event):
"""
Private method called when a FD is ready for reading, writing or was
lost. Do the work and raise errors where necessary.
"""
why = None
inRead = False
(filter, flags, data, fflags) = (
event.filter, event.flags, event.data, event.fflags)
if flags & KQ_EV_EOF and data and fflags:
why = main.CONNECTION_LOST
else:
try:
if selectable.fileno() == -1:
inRead = False
why = posixbase._NO_FILEDESC
else:
if filter == KQ_FILTER_READ:
inRead = True
why = selectable.doRead()
if filter == KQ_FILTER_WRITE:
inRead = False
why = selectable.doWrite()
except:
# Any exception from application code gets logged and will
# cause us to disconnect the selectable.
why = failure.Failure()
log.err(why, "An exception was raised from application code" \
" while processing a reactor selectable")
if why:
self._disconnectSelectable(selectable, why, inRead)
doIteration = doKEvent
def install():
"""
Install the kqueue() reactor.
"""
p = KQueueReactor()
from twisted.internet.main import installReactor
installReactor(p)
__all__ = ["KQueueReactor", "install"]