aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py322
1 files changed, 322 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py b/contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py
new file mode 100644
index 0000000000..a2896fb685
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/internet/asyncioreactor.py
@@ -0,0 +1,322 @@
+# -*- test-case-name: twisted.test.test_internet -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+asyncio-based reactor implementation.
+"""
+
+from __future__ import absolute_import, division
+
+import errno
+
+from zope.interface import implementer
+
+from twisted.logger import Logger
+from twisted.internet.base import DelayedCall
+from twisted.internet.posixbase import (PosixReactorBase, _NO_FILEDESC,
+ _ContinuousPolling)
+from twisted.python.log import callWithLogger
+from twisted.internet.interfaces import IReactorFDSet
+
+try:
+ from asyncio import get_event_loop
+except ImportError:
+ raise ImportError("Requires asyncio.")
+
+# As per ImportError above, this module is never imported on python 2, but
+# pyflakes still runs on python 2, so let's tell it where the errors come from.
+from builtins import PermissionError, BrokenPipeError
+
+
+class _DCHandle(object):
+ """
+ Wraps ephemeral L{asyncio.Handle} instances. Callbacks can close
+ over this and use it as a mutable reference to asyncio C{Handles}.
+
+ @ivar handle: The current L{asyncio.Handle}
+ """
+ def __init__(self, handle):
+ self.handle = handle
+
+
+ def cancel(self):
+ """
+ Cancel the inner L{asyncio.Handle}.
+ """
+ self.handle.cancel()
+
+
+
+@implementer(IReactorFDSet)
+class AsyncioSelectorReactor(PosixReactorBase):
+ """
+ Reactor running on top of L{asyncio.SelectorEventLoop}.
+ """
+ _asyncClosed = False
+ _log = Logger()
+
+ def __init__(self, eventloop=None):
+
+ if eventloop is None:
+ eventloop = get_event_loop()
+
+ self._asyncioEventloop = eventloop
+ self._writers = {}
+ self._readers = {}
+ self._delayedCalls = set()
+ self._continuousPolling = _ContinuousPolling(self)
+ super().__init__()
+
+
+ def _unregisterFDInAsyncio(self, fd):
+ """
+ Compensate for a bug in asyncio where it will not unregister a FD that
+ it cannot handle in the epoll loop. It touches internal asyncio code.
+
+ A description of the bug by markrwilliams:
+
+ The C{add_writer} method of asyncio event loops isn't atomic because
+ all the Selector classes in the selector module internally record a
+ file object before passing it to the platform's selector
+ implementation. If the platform's selector decides the file object
+ isn't acceptable, the resulting exception doesn't cause the Selector to
+ un-track the file object.
+
+ The failing/hanging stdio test goes through the following sequence of
+ events (roughly):
+
+ * The first C{connection.write(intToByte(value))} call hits the asyncio
+ reactor's C{addWriter} method.
+
+ * C{addWriter} calls the asyncio loop's C{add_writer} method, which
+ happens to live on C{_BaseSelectorEventLoop}.
+
+ * The asyncio loop's C{add_writer} method checks if the file object has
+ been registered before via the selector's C{get_key} method.
+
+ * It hasn't, so the KeyError block runs and calls the selector's
+ register method
+
+ * Code examples that follow use EpollSelector, but the code flow holds
+ true for any other selector implementation. The selector's register
+ method first calls through to the next register method in the MRO
+
+ * That next method is always C{_BaseSelectorImpl.register} which
+ creates a C{SelectorKey} instance for the file object, stores it under
+ the file object's file descriptor, and then returns it.
+
+ * Control returns to the concrete selector implementation, which asks
+ the operating system to track the file descriptor using the right API.
+
+ * The operating system refuses! An exception is raised that, in this
+ case, the asyncio reactor handles by creating a C{_ContinuousPolling}
+ object to watch the file descriptor.
+
+ * The second C{connection.write(intToByte(value))} call hits the
+ asyncio reactor's C{addWriter} method, which hits the C{add_writer}
+ method. But the loop's selector's get_key method now returns a
+ C{SelectorKey}! Now the asyncio reactor's C{addWriter} method thinks
+ the asyncio loop will watch the file descriptor, even though it won't.
+ """
+ try:
+ self._asyncioEventloop._selector.unregister(fd)
+ except:
+ pass
+
+
+ def _readOrWrite(self, selectable, read):
+ method = selectable.doRead if read else selectable.doWrite
+
+ if selectable.fileno() == -1:
+ self._disconnectSelectable(selectable, _NO_FILEDESC, read)
+ return
+
+ try:
+ why = method()
+ except Exception as e:
+ why = e
+ self._log.failure(None)
+ if why:
+ self._disconnectSelectable(selectable, why, read)
+
+
+ def addReader(self, reader):
+ if reader in self._readers.keys() or \
+ reader in self._continuousPolling._readers:
+ return
+
+ fd = reader.fileno()
+ try:
+ self._asyncioEventloop.add_reader(fd, callWithLogger, reader,
+ self._readOrWrite, reader,
+ True)
+ self._readers[reader] = fd
+ except IOError as e:
+ self._unregisterFDInAsyncio(fd)
+ if e.errno == errno.EPERM:
+ # epoll(7) doesn't support certain file descriptors,
+ # e.g. filesystem files, so for those we just poll
+ # continuously:
+ self._continuousPolling.addReader(reader)
+ else:
+ raise
+
+
+ def addWriter(self, writer):
+ if writer in self._writers.keys() or \
+ writer in self._continuousPolling._writers:
+ return
+
+ fd = writer.fileno()
+ try:
+ self._asyncioEventloop.add_writer(fd, callWithLogger, writer,
+ self._readOrWrite, writer,
+ False)
+ self._writers[writer] = fd
+ except PermissionError:
+ self._unregisterFDInAsyncio(fd)
+ # epoll(7) doesn't support certain file descriptors,
+ # e.g. filesystem files, so for those we just poll
+ # continuously:
+ self._continuousPolling.addWriter(writer)
+ except BrokenPipeError:
+ # The kqueuereactor will raise this if there is a broken pipe
+ self._unregisterFDInAsyncio(fd)
+ except:
+ self._unregisterFDInAsyncio(fd)
+ raise
+
+
+ def removeReader(self, reader):
+
+ # First, see if they're trying to remove a reader that we don't have.
+ if not (reader in self._readers.keys() \
+ or self._continuousPolling.isReading(reader)):
+ # We don't have it, so just return OK.
+ return
+
+ # If it was a cont. polling reader, check there first.
+ if self._continuousPolling.isReading(reader):
+ self._continuousPolling.removeReader(reader)
+ return
+
+ fd = reader.fileno()
+ if fd == -1:
+ # If the FD is -1, we want to know what its original FD was, to
+ # remove it.
+ fd = self._readers.pop(reader)
+ else:
+ self._readers.pop(reader)
+
+ self._asyncioEventloop.remove_reader(fd)
+
+
+ def removeWriter(self, writer):
+
+ # First, see if they're trying to remove a writer that we don't have.
+ if not (writer in self._writers.keys() \
+ or self._continuousPolling.isWriting(writer)):
+ # We don't have it, so just return OK.
+ return
+
+ # If it was a cont. polling writer, check there first.
+ if self._continuousPolling.isWriting(writer):
+ self._continuousPolling.removeWriter(writer)
+ return
+
+ fd = writer.fileno()
+
+ if fd == -1:
+ # If the FD is -1, we want to know what its original FD was, to
+ # remove it.
+ fd = self._writers.pop(writer)
+ else:
+ self._writers.pop(writer)
+
+ self._asyncioEventloop.remove_writer(fd)
+
+
+ def removeAll(self):
+ return (self._removeAll(self._readers.keys(), self._writers.keys()) +
+ self._continuousPolling.removeAll())
+
+
+ def getReaders(self):
+ return (list(self._readers.keys()) +
+ self._continuousPolling.getReaders())
+
+
+ def getWriters(self):
+ return (list(self._writers.keys()) +
+ self._continuousPolling.getWriters())
+
+
+ def getDelayedCalls(self):
+ return list(self._delayedCalls)
+
+
+ def iterate(self, timeout):
+ self._asyncioEventloop.call_later(timeout + 0.01,
+ self._asyncioEventloop.stop)
+ self._asyncioEventloop.run_forever()
+
+
+ def run(self, installSignalHandlers=True):
+ self.startRunning(installSignalHandlers=installSignalHandlers)
+ self._asyncioEventloop.run_forever()
+ if self._justStopped:
+ self._justStopped = False
+
+
+ def stop(self):
+ super().stop()
+ self.callLater(0, self.fireSystemEvent, "shutdown")
+
+
+ def crash(self):
+ super().crash()
+ self._asyncioEventloop.stop()
+
+
+ def seconds(self):
+ return self._asyncioEventloop.time()
+
+
+ def callLater(self, seconds, f, *args, **kwargs):
+ def run():
+ dc.called = True
+ self._delayedCalls.remove(dc)
+ f(*args, **kwargs)
+ handle = self._asyncioEventloop.call_later(seconds, run)
+ dchandle = _DCHandle(handle)
+
+ def cancel(dc):
+ self._delayedCalls.remove(dc)
+ dchandle.cancel()
+
+ def reset(dc):
+ dchandle.handle = self._asyncioEventloop.call_at(dc.time, run)
+
+ dc = DelayedCall(self.seconds() + seconds, run, (), {},
+ cancel, reset, seconds=self.seconds)
+ self._delayedCalls.add(dc)
+ return dc
+
+
+ def callFromThread(self, f, *args, **kwargs):
+ g = lambda: self.callLater(0, f, *args, **kwargs)
+ self._asyncioEventloop.call_soon_threadsafe(g)
+
+
+
+def install(eventloop=None):
+ """
+ Install an asyncio-based reactor.
+
+ @param eventloop: The asyncio eventloop to wrap. If default, the global one
+ is selected.
+ """
+ reactor = AsyncioSelectorReactor(eventloop)
+ from twisted.internet.main import installReactor
+ installReactor(reactor)