aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/internet/cfreactor.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/internet/cfreactor.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/internet/cfreactor.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/internet/cfreactor.py502
1 files changed, 502 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/internet/cfreactor.py b/contrib/python/Twisted/py2/twisted/internet/cfreactor.py
new file mode 100644
index 0000000000..d161807439
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/internet/cfreactor.py
@@ -0,0 +1,502 @@
+# -*- test-case-name: twisted.internet.test.test_core -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+A reactor for integrating with U{CFRunLoop<http://bit.ly/cfrunloop>}, the
+CoreFoundation main loop used by macOS.
+
+This is useful for integrating Twisted with U{PyObjC<http://pyobjc.sf.net/>}
+applications.
+"""
+
+__all__ = [
+ 'install',
+ 'CFReactor'
+]
+
+import sys
+
+from zope.interface import implementer
+
+from twisted.internet.interfaces import IReactorFDSet
+from twisted.internet.posixbase import PosixReactorBase, _Waker
+from twisted.internet.posixbase import _NO_FILEDESC
+
+from twisted.python import log
+
+from CoreFoundation import (
+ CFRunLoopAddSource, CFRunLoopRemoveSource, CFRunLoopGetMain, CFRunLoopRun,
+ CFRunLoopStop, CFRunLoopTimerCreate, CFRunLoopAddTimer,
+ CFRunLoopTimerInvalidate, kCFAllocatorDefault, kCFRunLoopCommonModes,
+ CFAbsoluteTimeGetCurrent)
+
+from CFNetwork import (
+ CFSocketCreateWithNative, CFSocketSetSocketFlags, CFSocketEnableCallBacks,
+ CFSocketCreateRunLoopSource, CFSocketDisableCallBacks, CFSocketInvalidate,
+ kCFSocketWriteCallBack, kCFSocketReadCallBack, kCFSocketConnectCallBack,
+ kCFSocketAutomaticallyReenableReadCallBack,
+ kCFSocketAutomaticallyReenableWriteCallBack)
+
+
+_READ = 0
+_WRITE = 1
+_preserveSOError = 1 << 6
+
+
+class _WakerPlus(_Waker):
+ """
+ The normal Twisted waker will simply wake up the main loop, which causes an
+ iteration to run, which in turn causes L{ReactorBase.runUntilCurrent}
+ to get invoked.
+
+ L{CFReactor} has a slightly different model of iteration, though: rather
+ than have each iteration process the thread queue, then timed calls, then
+ file descriptors, each callback is run as it is dispatched by the CFRunLoop
+ observer which triggered it.
+
+ So this waker needs to not only unblock the loop, but also make sure the
+ work gets done; so, it reschedules the invocation of C{runUntilCurrent} to
+ be immediate (0 seconds from now) even if there is no timed call work to
+ do.
+ """
+
+ def doRead(self):
+ """
+ Wake up the loop and force C{runUntilCurrent} to run immediately in the
+ next timed iteration.
+ """
+ result = _Waker.doRead(self)
+ self.reactor._scheduleSimulate(True)
+ return result
+
+
+
+@implementer(IReactorFDSet)
+class CFReactor(PosixReactorBase):
+ """
+ The CoreFoundation reactor.
+
+ You probably want to use this via the L{install} API.
+
+ @ivar _fdmap: a dictionary, mapping an integer (a file descriptor) to a
+ 4-tuple of:
+
+ - source: a C{CFRunLoopSource}; the source associated with this
+ socket.
+ - socket: a C{CFSocket} wrapping the file descriptor.
+ - descriptor: an L{IReadDescriptor} and/or L{IWriteDescriptor}
+ provider.
+ - read-write: a 2-C{list} of booleans: respectively, whether this
+ descriptor is currently registered for reading or registered for
+ writing.
+
+ @ivar _idmap: a dictionary, mapping the id() of an L{IReadDescriptor} or
+ L{IWriteDescriptor} to a C{fd} in L{_fdmap}. Implemented in this
+ manner so that we don't have to rely (even more) on the hashability of
+ L{IReadDescriptor} providers, and we know that they won't be collected
+ since these are kept in sync with C{_fdmap}. Necessary because the
+ .fileno() of a file descriptor may change at will, so we need to be
+ able to look up what its file descriptor I{used} to be, so that we can
+ look it up in C{_fdmap}
+
+ @ivar _cfrunloop: the C{CFRunLoop} pyobjc object wrapped
+ by this reactor.
+
+ @ivar _inCFLoop: Is C{CFRunLoopRun} currently running?
+
+ @type _inCFLoop: L{bool}
+
+ @ivar _currentSimulator: if a CFTimer is currently scheduled with the CF
+ run loop to run Twisted callLater calls, this is a reference to it.
+ Otherwise, it is L{None}
+ """
+ def __init__(self, runLoop=None, runner=None):
+ self._fdmap = {}
+ self._idmap = {}
+ if runner is None:
+ runner = CFRunLoopRun
+ self._runner = runner
+
+ if runLoop is None:
+ runLoop = CFRunLoopGetMain()
+ self._cfrunloop = runLoop
+ PosixReactorBase.__init__(self)
+
+
+ def installWaker(self):
+ """
+ Override C{installWaker} in order to use L{_WakerPlus}; otherwise this
+ should be exactly the same as the parent implementation.
+ """
+ if not self.waker:
+ self.waker = _WakerPlus(self)
+ self._internalReaders.add(self.waker)
+ self.addReader(self.waker)
+
+
+ def _socketCallback(self, cfSocket, callbackType,
+ ignoredAddress, ignoredData, context):
+ """
+ The socket callback issued by CFRunLoop. This will issue C{doRead} or
+ C{doWrite} calls to the L{IReadDescriptor} and L{IWriteDescriptor}
+ registered with the file descriptor that we are being notified of.
+
+ @param cfSocket: The C{CFSocket} which has got some activity.
+
+ @param callbackType: The type of activity that we are being notified
+ of. Either C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}.
+
+ @param ignoredAddress: Unused, because this is not used for either of
+ the callback types we register for.
+
+ @param ignoredData: Unused, because this is not used for either of the
+ callback types we register for.
+
+ @param context: The data associated with this callback by
+ C{CFSocketCreateWithNative} (in C{CFReactor._watchFD}). A 2-tuple
+ of C{(int, CFRunLoopSource)}.
+ """
+ (fd, smugglesrc) = context
+ if fd not in self._fdmap:
+ # Spurious notifications seem to be generated sometimes if you
+ # CFSocketDisableCallBacks in the middle of an event. I don't know
+ # about this FD, any more, so let's get rid of it.
+ CFRunLoopRemoveSource(
+ self._cfrunloop, smugglesrc, kCFRunLoopCommonModes
+ )
+ return
+
+ src, skt, readWriteDescriptor, rw = self._fdmap[fd]
+
+ def _drdw():
+ why = None
+ isRead = False
+
+ try:
+ if readWriteDescriptor.fileno() == -1:
+ why = _NO_FILEDESC
+ else:
+ isRead = callbackType == kCFSocketReadCallBack
+ # CFSocket seems to deliver duplicate read/write
+ # notifications sometimes, especially a duplicate
+ # writability notification when first registering the
+ # socket. This bears further investigation, since I may
+ # have been mis-interpreting the behavior I was seeing.
+ # (Running the full Twisted test suite, while thorough, is
+ # not always entirely clear.) Until this has been more
+ # thoroughly investigated , we consult our own
+ # reading/writing state flags to determine whether we
+ # should actually attempt a doRead/doWrite first. -glyph
+ if isRead:
+ if rw[_READ]:
+ why = readWriteDescriptor.doRead()
+ else:
+ if rw[_WRITE]:
+ why = readWriteDescriptor.doWrite()
+ except:
+ why = sys.exc_info()[1]
+ log.err()
+ if why:
+ self._disconnectSelectable(readWriteDescriptor, why, isRead)
+
+ log.callWithLogger(readWriteDescriptor, _drdw)
+
+
+ def _watchFD(self, fd, descr, flag):
+ """
+ Register a file descriptor with the C{CFRunLoop}, or modify its state
+ so that it's listening for both notifications (read and write) rather
+ than just one; used to implement C{addReader} and C{addWriter}.
+
+ @param fd: The file descriptor.
+
+ @type fd: L{int}
+
+ @param descr: the L{IReadDescriptor} or L{IWriteDescriptor}
+
+ @param flag: the flag to register for callbacks on, either
+ C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}
+ """
+ if fd == -1:
+ raise RuntimeError("Invalid file descriptor.")
+ if fd in self._fdmap:
+ src, cfs, gotdescr, rw = self._fdmap[fd]
+ # do I need to verify that it's the same descr?
+ else:
+ ctx = []
+ ctx.append(fd)
+ cfs = CFSocketCreateWithNative(
+ kCFAllocatorDefault, fd,
+ kCFSocketReadCallBack | kCFSocketWriteCallBack |
+ kCFSocketConnectCallBack,
+ self._socketCallback, ctx
+ )
+ CFSocketSetSocketFlags(
+ cfs,
+ kCFSocketAutomaticallyReenableReadCallBack |
+ kCFSocketAutomaticallyReenableWriteCallBack |
+
+ # This extra flag is to ensure that CF doesn't (destructively,
+ # because destructively is the only way to do it) retrieve
+ # SO_ERROR and thereby break twisted.internet.tcp.BaseClient,
+ # which needs SO_ERROR to tell it whether or not it needs to
+ # call connect_ex a second time.
+ _preserveSOError
+ )
+ src = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cfs, 0)
+ ctx.append(src)
+ CFRunLoopAddSource(self._cfrunloop, src, kCFRunLoopCommonModes)
+ CFSocketDisableCallBacks(
+ cfs,
+ kCFSocketReadCallBack | kCFSocketWriteCallBack |
+ kCFSocketConnectCallBack
+ )
+ rw = [False, False]
+ self._idmap[id(descr)] = fd
+ self._fdmap[fd] = src, cfs, descr, rw
+ rw[self._flag2idx(flag)] = True
+ CFSocketEnableCallBacks(cfs, flag)
+
+
+ def _flag2idx(self, flag):
+ """
+ Convert a C{kCFSocket...} constant to an index into the read/write
+ state list (C{_READ} or C{_WRITE}) (the 4th element of the value of
+ C{self._fdmap}).
+
+ @param flag: C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}
+
+ @return: C{_READ} or C{_WRITE}
+ """
+ return {kCFSocketReadCallBack: _READ,
+ kCFSocketWriteCallBack: _WRITE}[flag]
+
+
+ def _unwatchFD(self, fd, descr, flag):
+ """
+ Unregister a file descriptor with the C{CFRunLoop}, or modify its state
+ so that it's listening for only one notification (read or write) as
+ opposed to both; used to implement C{removeReader} and C{removeWriter}.
+
+ @param fd: a file descriptor
+
+ @type fd: C{int}
+
+ @param descr: an L{IReadDescriptor} or L{IWriteDescriptor}
+
+ @param flag: C{kCFSocketWriteCallBack} C{kCFSocketReadCallBack}
+ """
+ if id(descr) not in self._idmap:
+ return
+ if fd == -1:
+ # need to deal with it in this case, I think.
+ realfd = self._idmap[id(descr)]
+ else:
+ realfd = fd
+ src, cfs, descr, rw = self._fdmap[realfd]
+ CFSocketDisableCallBacks(cfs, flag)
+ rw[self._flag2idx(flag)] = False
+ if not rw[_READ] and not rw[_WRITE]:
+ del self._idmap[id(descr)]
+ del self._fdmap[realfd]
+ CFRunLoopRemoveSource(self._cfrunloop, src, kCFRunLoopCommonModes)
+ CFSocketInvalidate(cfs)
+
+
+ def addReader(self, reader):
+ """
+ Implement L{IReactorFDSet.addReader}.
+ """
+ self._watchFD(reader.fileno(), reader, kCFSocketReadCallBack)
+
+
+ def addWriter(self, writer):
+ """
+ Implement L{IReactorFDSet.addWriter}.
+ """
+ self._watchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
+
+
+ def removeReader(self, reader):
+ """
+ Implement L{IReactorFDSet.removeReader}.
+ """
+ self._unwatchFD(reader.fileno(), reader, kCFSocketReadCallBack)
+
+
+ def removeWriter(self, writer):
+ """
+ Implement L{IReactorFDSet.removeWriter}.
+ """
+ self._unwatchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
+
+
+ def removeAll(self):
+ """
+ Implement L{IReactorFDSet.removeAll}.
+ """
+ allDesc = set([descr for src, cfs, descr, rw in self._fdmap.values()])
+ allDesc -= set(self._internalReaders)
+ for desc in allDesc:
+ self.removeReader(desc)
+ self.removeWriter(desc)
+ return list(allDesc)
+
+
+ def getReaders(self):
+ """
+ Implement L{IReactorFDSet.getReaders}.
+ """
+ return [descr for src, cfs, descr, rw in self._fdmap.values()
+ if rw[_READ]]
+
+
+ def getWriters(self):
+ """
+ Implement L{IReactorFDSet.getWriters}.
+ """
+ return [descr for src, cfs, descr, rw in self._fdmap.values()
+ if rw[_WRITE]]
+
+
+ def _moveCallLaterSooner(self, tple):
+ """
+ Override L{PosixReactorBase}'s implementation of L{IDelayedCall.reset}
+ so that it will immediately reschedule. Normally
+ C{_moveCallLaterSooner} depends on the fact that C{runUntilCurrent} is
+ always run before the mainloop goes back to sleep, so this forces it to
+ immediately recompute how long the loop needs to stay asleep.
+ """
+ result = PosixReactorBase._moveCallLaterSooner(self, tple)
+ self._scheduleSimulate()
+ return result
+
+
+ _inCFLoop = False
+
+ def mainLoop(self):
+ """
+ Run the runner (C{CFRunLoopRun} or something that calls it), which runs
+ the run loop until C{crash()} is called.
+ """
+ self._inCFLoop = True
+ try:
+ self._runner()
+ finally:
+ self._inCFLoop = False
+
+
+ _currentSimulator = None
+
+ def _scheduleSimulate(self, force=False):
+ """
+ Schedule a call to C{self.runUntilCurrent}. This will cancel the
+ currently scheduled call if it is already scheduled.
+
+ @param force: Even if there are no timed calls, make sure that
+ C{runUntilCurrent} runs immediately (in a 0-seconds-from-now
+ C{CFRunLoopTimer}). This is necessary for calls which need to
+ trigger behavior of C{runUntilCurrent} other than running timed
+ calls, such as draining the thread call queue or calling C{crash()}
+ when the appropriate flags are set.
+
+ @type force: C{bool}
+ """
+ if self._currentSimulator is not None:
+ CFRunLoopTimerInvalidate(self._currentSimulator)
+ self._currentSimulator = None
+ timeout = self.timeout()
+ if force:
+ timeout = 0.0
+ if timeout is not None:
+ fireDate = (CFAbsoluteTimeGetCurrent() + timeout)
+ def simulate(cftimer, extra):
+ self._currentSimulator = None
+ self.runUntilCurrent()
+ self._scheduleSimulate()
+ c = self._currentSimulator = CFRunLoopTimerCreate(
+ kCFAllocatorDefault, fireDate,
+ 0, 0, 0, simulate, None
+ )
+ CFRunLoopAddTimer(self._cfrunloop, c, kCFRunLoopCommonModes)
+
+
+ def callLater(self, _seconds, _f, *args, **kw):
+ """
+ Implement L{IReactorTime.callLater}.
+ """
+ delayedCall = PosixReactorBase.callLater(
+ self, _seconds, _f, *args, **kw
+ )
+ self._scheduleSimulate()
+ return delayedCall
+
+
+ def stop(self):
+ """
+ Implement L{IReactorCore.stop}.
+ """
+ PosixReactorBase.stop(self)
+ self._scheduleSimulate(True)
+
+
+ def crash(self):
+ """
+ Implement L{IReactorCore.crash}
+ """
+ wasStarted = self._started
+ PosixReactorBase.crash(self)
+ if self._inCFLoop:
+ self._stopNow()
+ else:
+ if wasStarted:
+ self.callLater(0, self._stopNow)
+
+
+ def _stopNow(self):
+ """
+ Immediately stop the CFRunLoop (which must be running!).
+ """
+ CFRunLoopStop(self._cfrunloop)
+
+
+ def iterate(self, delay=0):
+ """
+ Emulate the behavior of C{iterate()} for things that want to call it,
+ by letting the loop run for a little while and then scheduling a timed
+ call to exit it.
+ """
+ self.callLater(delay, self._stopNow)
+ self.mainLoop()
+
+
+
+def install(runLoop=None, runner=None):
+ """
+ Configure the twisted mainloop to be run inside CFRunLoop.
+
+ @param runLoop: the run loop to use.
+
+ @param runner: the function to call in order to actually invoke the main
+ loop. This will default to C{CFRunLoopRun} if not specified. However,
+ this is not an appropriate choice for GUI applications, as you need to
+ run NSApplicationMain (or something like it). For example, to run the
+ Twisted mainloop in a PyObjC application, your C{main.py} should look
+ something like this::
+
+ from PyObjCTools import AppHelper
+ from twisted.internet.cfreactor import install
+ install(runner=AppHelper.runEventLoop)
+ # initialize your application
+ reactor.run()
+
+ @return: The installed reactor.
+
+ @rtype: C{CFReactor}
+ """
+
+ reactor = CFReactor(runLoop=runLoop, runner=runner)
+ from twisted.internet.main import installReactor
+ installReactor(reactor)
+ return reactor