aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/internet/_pollingfile.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/internet/_pollingfile.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/internet/_pollingfile.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/internet/_pollingfile.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/internet/_pollingfile.py b/contrib/python/Twisted/py3/twisted/internet/_pollingfile.py
new file mode 100644
index 0000000000..758a4cecb7
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/internet/_pollingfile.py
@@ -0,0 +1,291 @@
+# -*- test-case-name: twisted.internet.test.test_pollingfile -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Implements a simple polling interface for file descriptors that don't work with
+select() - this is pretty much only useful on Windows.
+"""
+
+
+from zope.interface import implementer
+
+from twisted.internet.interfaces import IConsumer, IPushProducer
+
+MIN_TIMEOUT = 0.000000001
+MAX_TIMEOUT = 0.1
+
+
+class _PollableResource:
+ active = True
+
+ def activate(self):
+ self.active = True
+
+ def deactivate(self):
+ self.active = False
+
+
+class _PollingTimer:
+ # Everything is private here because it is really an implementation detail.
+
+ def __init__(self, reactor):
+ self.reactor = reactor
+ self._resources = []
+ self._pollTimer = None
+ self._currentTimeout = MAX_TIMEOUT
+ self._paused = False
+
+ def _addPollableResource(self, res):
+ self._resources.append(res)
+ self._checkPollingState()
+
+ def _checkPollingState(self):
+ for resource in self._resources:
+ if resource.active:
+ self._startPolling()
+ break
+ else:
+ self._stopPolling()
+
+ def _startPolling(self):
+ if self._pollTimer is None:
+ self._pollTimer = self._reschedule()
+
+ def _stopPolling(self):
+ if self._pollTimer is not None:
+ self._pollTimer.cancel()
+ self._pollTimer = None
+
+ def _pause(self):
+ self._paused = True
+
+ def _unpause(self):
+ self._paused = False
+ self._checkPollingState()
+
+ def _reschedule(self):
+ if not self._paused:
+ return self.reactor.callLater(self._currentTimeout, self._pollEvent)
+
+ def _pollEvent(self):
+ workUnits = 0.0
+ anyActive = []
+ for resource in self._resources:
+ if resource.active:
+ workUnits += resource.checkWork()
+ # Check AFTER work has been done
+ if resource.active:
+ anyActive.append(resource)
+
+ newTimeout = self._currentTimeout
+ if workUnits:
+ newTimeout = self._currentTimeout / (workUnits + 1.0)
+ if newTimeout < MIN_TIMEOUT:
+ newTimeout = MIN_TIMEOUT
+ else:
+ newTimeout = self._currentTimeout * 2.0
+ if newTimeout > MAX_TIMEOUT:
+ newTimeout = MAX_TIMEOUT
+ self._currentTimeout = newTimeout
+ if anyActive:
+ self._pollTimer = self._reschedule()
+
+
+# If we ever (let's hope not) need the above functionality on UNIX, this could
+# be factored into a different module.
+
+import pywintypes # type: ignore[import]
+import win32api # type: ignore[import]
+import win32file # type: ignore[import]
+import win32pipe # type: ignore[import]
+
+
+@implementer(IPushProducer)
+class _PollableReadPipe(_PollableResource):
+ def __init__(self, pipe, receivedCallback, lostCallback):
+ # security attributes for pipes
+ self.pipe = pipe
+ self.receivedCallback = receivedCallback
+ self.lostCallback = lostCallback
+
+ def checkWork(self):
+ finished = 0
+ fullDataRead = []
+
+ while 1:
+ try:
+ buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
+ # finished = (result == -1)
+ if not bytesToRead:
+ break
+ hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
+ fullDataRead.append(data)
+ except win32api.error:
+ finished = 1
+ break
+
+ dataBuf = b"".join(fullDataRead)
+ if dataBuf:
+ self.receivedCallback(dataBuf)
+ if finished:
+ self.cleanup()
+ return len(dataBuf)
+
+ def cleanup(self):
+ self.deactivate()
+ self.lostCallback()
+
+ def close(self):
+ try:
+ win32api.CloseHandle(self.pipe)
+ except pywintypes.error:
+ # You can't close std handles...?
+ pass
+
+ def stopProducing(self):
+ self.close()
+
+ def pauseProducing(self):
+ self.deactivate()
+
+ def resumeProducing(self):
+ self.activate()
+
+
+FULL_BUFFER_SIZE = 64 * 1024
+
+
+@implementer(IConsumer)
+class _PollableWritePipe(_PollableResource):
+ def __init__(self, writePipe, lostCallback):
+ self.disconnecting = False
+ self.producer = None
+ self.producerPaused = False
+ self.streamingProducer = 0
+ self.outQueue = []
+ self.writePipe = writePipe
+ self.lostCallback = lostCallback
+ try:
+ win32pipe.SetNamedPipeHandleState(
+ writePipe, win32pipe.PIPE_NOWAIT, None, None
+ )
+ except pywintypes.error:
+ # Maybe it's an invalid handle. Who knows.
+ pass
+
+ def close(self):
+ self.disconnecting = True
+
+ def bufferFull(self):
+ if self.producer is not None:
+ self.producerPaused = True
+ self.producer.pauseProducing()
+
+ def bufferEmpty(self):
+ if self.producer is not None and (
+ (not self.streamingProducer) or self.producerPaused
+ ):
+ self.producer.producerPaused = False
+ self.producer.resumeProducing()
+ return True
+ return False
+
+ # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
+
+ def registerProducer(self, producer, streaming):
+ """Register to receive data from a producer.
+
+ This sets this selectable to be a consumer for a producer. When this
+ selectable runs out of data on a write() call, it will ask the producer
+ to resumeProducing(). A producer should implement the IProducer
+ interface.
+
+ FileDescriptor provides some infrastructure for producer methods.
+ """
+ if self.producer is not None:
+ raise RuntimeError(
+ "Cannot register producer %s, because producer %s was never "
+ "unregistered." % (producer, self.producer)
+ )
+ if not self.active:
+ producer.stopProducing()
+ else:
+ self.producer = producer
+ self.streamingProducer = streaming
+ if not streaming:
+ producer.resumeProducing()
+
+ def unregisterProducer(self):
+ """Stop consuming data from a producer, without disconnecting."""
+ self.producer = None
+
+ def writeConnectionLost(self):
+ self.deactivate()
+ try:
+ win32api.CloseHandle(self.writePipe)
+ except pywintypes.error:
+ # OMG what
+ pass
+ self.lostCallback()
+
+ def writeSequence(self, seq):
+ """
+ Append a C{list} or C{tuple} of bytes to the output buffer.
+
+ @param seq: C{list} or C{tuple} of C{str} instances to be appended to
+ the output buffer.
+
+ @raise TypeError: If C{seq} contains C{unicode}.
+ """
+ if str in map(type, seq):
+ raise TypeError("Unicode not allowed in output buffer.")
+ self.outQueue.extend(seq)
+
+ def write(self, data):
+ """
+ Append some bytes to the output buffer.
+
+ @param data: C{str} to be appended to the output buffer.
+ @type data: C{str}.
+
+ @raise TypeError: If C{data} is C{unicode} instead of C{str}.
+ """
+ if isinstance(data, str):
+ raise TypeError("Unicode not allowed in output buffer.")
+ if self.disconnecting:
+ return
+ self.outQueue.append(data)
+ if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
+ self.bufferFull()
+
+ def checkWork(self):
+ numBytesWritten = 0
+ if not self.outQueue:
+ if self.disconnecting:
+ self.writeConnectionLost()
+ return 0
+ try:
+ win32file.WriteFile(self.writePipe, b"", None)
+ except pywintypes.error:
+ self.writeConnectionLost()
+ return numBytesWritten
+ while self.outQueue:
+ data = self.outQueue.pop(0)
+ errCode = 0
+ try:
+ errCode, nBytesWritten = win32file.WriteFile(self.writePipe, data, None)
+ except win32api.error:
+ self.writeConnectionLost()
+ break
+ else:
+ # assert not errCode, "wtf an error code???"
+ numBytesWritten += nBytesWritten
+ if len(data) > nBytesWritten:
+ self.outQueue.insert(0, data[nBytesWritten:])
+ break
+ else:
+ resumed = self.bufferEmpty()
+ if not resumed and self.disconnecting:
+ self.writeConnectionLost()
+ return numBytesWritten