aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/protocols/pcp.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/protocols/pcp.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/protocols/pcp.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/protocols/pcp.py211
1 files changed, 211 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/protocols/pcp.py b/contrib/python/Twisted/py3/twisted/protocols/pcp.py
new file mode 100644
index 0000000000..978ec64d6d
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/protocols/pcp.py
@@ -0,0 +1,211 @@
+# -*- test-case-name: twisted.test.test_pcp -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Producer-Consumer Proxy.
+"""
+
+from zope.interface import implementer
+
+from twisted.internet import interfaces
+
+
+@implementer(interfaces.IProducer, interfaces.IConsumer)
+class BasicProducerConsumerProxy:
+ """
+ I can act as a man in the middle between any Producer and Consumer.
+
+ @ivar producer: the Producer I subscribe to.
+ @type producer: L{IProducer<interfaces.IProducer>}
+ @ivar consumer: the Consumer I publish to.
+ @type consumer: L{IConsumer<interfaces.IConsumer>}
+ @ivar paused: As a Producer, am I paused?
+ @type paused: bool
+ """
+
+ consumer = None
+ producer = None
+ producerIsStreaming = None
+ iAmStreaming = True
+ outstandingPull = False
+ paused = False
+ stopped = False
+
+ def __init__(self, consumer):
+ self._buffer = []
+ if consumer is not None:
+ self.consumer = consumer
+ consumer.registerProducer(self, self.iAmStreaming)
+
+ # Producer methods:
+
+ def pauseProducing(self):
+ self.paused = True
+ if self.producer:
+ self.producer.pauseProducing()
+
+ def resumeProducing(self):
+ self.paused = False
+ if self._buffer:
+ # TODO: Check to see if consumer supports writeSeq.
+ self.consumer.write("".join(self._buffer))
+ self._buffer[:] = []
+ else:
+ if not self.iAmStreaming:
+ self.outstandingPull = True
+
+ if self.producer is not None:
+ self.producer.resumeProducing()
+
+ def stopProducing(self):
+ if self.producer is not None:
+ self.producer.stopProducing()
+ if self.consumer is not None:
+ del self.consumer
+
+ # Consumer methods:
+
+ def write(self, data):
+ if self.paused or (not self.iAmStreaming and not self.outstandingPull):
+ # We could use that fifo queue here.
+ self._buffer.append(data)
+
+ elif self.consumer is not None:
+ self.consumer.write(data)
+ self.outstandingPull = False
+
+ def finish(self):
+ if self.consumer is not None:
+ self.consumer.finish()
+ self.unregisterProducer()
+
+ def registerProducer(self, producer, streaming):
+ self.producer = producer
+ self.producerIsStreaming = streaming
+
+ def unregisterProducer(self):
+ if self.producer is not None:
+ del self.producer
+ del self.producerIsStreaming
+ if self.consumer:
+ self.consumer.unregisterProducer()
+
+ def __repr__(self) -> str:
+ return f"<{self.__class__}@{id(self):x} around {self.consumer}>"
+
+
+class ProducerConsumerProxy(BasicProducerConsumerProxy):
+ """ProducerConsumerProxy with a finite buffer.
+
+ When my buffer fills up, I have my parent Producer pause until my buffer
+ has room in it again.
+ """
+
+ # Copies much from abstract.FileDescriptor
+ bufferSize = 2**2**2**2
+
+ producerPaused = False
+ unregistered = False
+
+ def pauseProducing(self):
+ # Does *not* call up to ProducerConsumerProxy to relay the pause
+ # message through to my parent Producer.
+ self.paused = True
+
+ def resumeProducing(self):
+ self.paused = False
+ if self._buffer:
+ data = "".join(self._buffer)
+ bytesSent = self._writeSomeData(data)
+ if bytesSent < len(data):
+ unsent = data[bytesSent:]
+ assert (
+ not self.iAmStreaming
+ ), "Streaming producer did not write all its data."
+ self._buffer[:] = [unsent]
+ else:
+ self._buffer[:] = []
+ else:
+ bytesSent = 0
+
+ if (
+ self.unregistered
+ and bytesSent
+ and not self._buffer
+ and self.consumer is not None
+ ):
+ self.consumer.unregisterProducer()
+
+ if not self.iAmStreaming:
+ self.outstandingPull = not bytesSent
+
+ if self.producer is not None:
+ bytesBuffered = sum(len(s) for s in self._buffer)
+ # TODO: You can see here the potential for high and low
+ # watermarks, where bufferSize would be the high mark when we
+ # ask the upstream producer to pause, and we wouldn't have
+ # it resume again until it hit the low mark. Or if producer
+ # is Pull, maybe we'd like to pull from it as much as necessary
+ # to keep our buffer full to the low mark, so we're never caught
+ # without something to send.
+ if self.producerPaused and (bytesBuffered < self.bufferSize):
+ # Now that our buffer is empty,
+ self.producerPaused = False
+ self.producer.resumeProducing()
+ elif self.outstandingPull:
+ # I did not have any data to write in response to a pull,
+ # so I'd better pull some myself.
+ self.producer.resumeProducing()
+
+ def write(self, data):
+ if self.paused or (not self.iAmStreaming and not self.outstandingPull):
+ # We could use that fifo queue here.
+ self._buffer.append(data)
+
+ elif self.consumer is not None:
+ assert (
+ not self._buffer
+ ), "Writing fresh data to consumer before my buffer is empty!"
+ # I'm going to use _writeSomeData here so that there is only one
+ # path to self.consumer.write. But it doesn't actually make sense,
+ # if I am streaming, for some data to not be all data. But maybe I
+ # am not streaming, but I am writing here anyway, because there was
+ # an earlier request for data which was not answered.
+ bytesSent = self._writeSomeData(data)
+ self.outstandingPull = False
+ if not bytesSent == len(data):
+ assert (
+ not self.iAmStreaming
+ ), "Streaming producer did not write all its data."
+ self._buffer.append(data[bytesSent:])
+
+ if (self.producer is not None) and self.producerIsStreaming:
+ bytesBuffered = sum(len(s) for s in self._buffer)
+ if bytesBuffered >= self.bufferSize:
+ self.producer.pauseProducing()
+ self.producerPaused = True
+
+ def registerProducer(self, producer, streaming):
+ self.unregistered = False
+ BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
+ if not streaming:
+ producer.resumeProducing()
+
+ def unregisterProducer(self):
+ if self.producer is not None:
+ del self.producer
+ del self.producerIsStreaming
+ self.unregistered = True
+ if self.consumer and not self._buffer:
+ self.consumer.unregisterProducer()
+
+ def _writeSomeData(self, data):
+ """Write as much of this data as possible.
+
+ @returns: The number of bytes written.
+ """
+ if self.consumer is None:
+ return 0
+ self.consumer.write(data)
+ return len(data)