aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/streams.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/tools/python3/src/Lib/asyncio/streams.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/streams.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/streams.py751
1 files changed, 751 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py
new file mode 100644
index 0000000000..3d577f1270
--- /dev/null
+++ b/contrib/tools/python3/src/Lib/asyncio/streams.py
@@ -0,0 +1,751 @@
+__all__ = (
+ 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
+ 'open_connection', 'start_server')
+
+import collections
+import socket
+import sys
+import warnings
+import weakref
+
+if hasattr(socket, 'AF_UNIX'):
+ __all__ += ('open_unix_connection', 'start_unix_server')
+
+from . import coroutines
+from . import events
+from . import exceptions
+from . import format_helpers
+from . import protocols
+from .log import logger
+from .tasks import sleep
+
+
+_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
+
+
+async def open_connection(host=None, port=None, *,
+ limit=_DEFAULT_LIMIT, **kwds):
+ """A wrapper for create_connection() returning a (reader, writer) pair.
+
+ The reader returned is a StreamReader instance; the writer is a
+ StreamWriter instance.
+
+ The arguments are all the usual arguments to create_connection()
+ except protocol_factory; most common are positional host and port,
+ with various optional keyword arguments following.
+
+ Additional optional keyword arguments are loop (to set the event loop
+ instance to use) and limit (to set the buffer limit passed to the
+ StreamReader).
+
+ (If you want to customize the StreamReader and/or
+ StreamReaderProtocol classes, just copy the code -- there's
+ really nothing special here except some convenience.)
+ """
+ loop = events.get_running_loop()
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, loop=loop)
+ transport, _ = await loop.create_connection(
+ lambda: protocol, host, port, **kwds)
+ writer = StreamWriter(transport, protocol, reader, loop)
+ return reader, writer
+
+
+async def start_server(client_connected_cb, host=None, port=None, *,
+ limit=_DEFAULT_LIMIT, **kwds):
+ """Start a socket server, call back for each client connected.
+
+ The first parameter, `client_connected_cb`, takes two parameters:
+ client_reader, client_writer. client_reader is a StreamReader
+ object, while client_writer is a StreamWriter object. This
+ parameter can either be a plain callback function or a coroutine;
+ if it is a coroutine, it will be automatically converted into a
+ Task.
+
+ The rest of the arguments are all the usual arguments to
+ loop.create_server() except protocol_factory; most common are
+ positional host and port, with various optional keyword arguments
+ following. The return value is the same as loop.create_server().
+
+ Additional optional keyword arguments are loop (to set the event loop
+ instance to use) and limit (to set the buffer limit passed to the
+ StreamReader).
+
+ The return value is the same as loop.create_server(), i.e. a
+ Server object which can be used to stop the service.
+ """
+ loop = events.get_running_loop()
+
+ def factory():
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, client_connected_cb,
+ loop=loop)
+ return protocol
+
+ return await loop.create_server(factory, host, port, **kwds)
+
+
+if hasattr(socket, 'AF_UNIX'):
+ # UNIX Domain Sockets are supported on this platform
+
+ async def open_unix_connection(path=None, *,
+ limit=_DEFAULT_LIMIT, **kwds):
+ """Similar to `open_connection` but works with UNIX Domain Sockets."""
+ loop = events.get_running_loop()
+
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, loop=loop)
+ transport, _ = await loop.create_unix_connection(
+ lambda: protocol, path, **kwds)
+ writer = StreamWriter(transport, protocol, reader, loop)
+ return reader, writer
+
+ async def start_unix_server(client_connected_cb, path=None, *,
+ limit=_DEFAULT_LIMIT, **kwds):
+ """Similar to `start_server` but works with UNIX Domain Sockets."""
+ loop = events.get_running_loop()
+
+ def factory():
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, client_connected_cb,
+ loop=loop)
+ return protocol
+
+ return await loop.create_unix_server(factory, path, **kwds)
+
+
+class FlowControlMixin(protocols.Protocol):
+ """Reusable flow control logic for StreamWriter.drain().
+
+ This implements the protocol methods pause_writing(),
+ resume_writing() and connection_lost(). If the subclass overrides
+ these it must call the super methods.
+
+ StreamWriter.drain() must wait for _drain_helper() coroutine.
+ """
+
+ def __init__(self, loop=None):
+ if loop is None:
+ self._loop = events._get_event_loop(stacklevel=4)
+ else:
+ self._loop = loop
+ self._paused = False
+ self._drain_waiters = collections.deque()
+ self._connection_lost = False
+
+ def pause_writing(self):
+ assert not self._paused
+ self._paused = True
+ if self._loop.get_debug():
+ logger.debug("%r pauses writing", self)
+
+ def resume_writing(self):
+ assert self._paused
+ self._paused = False
+ if self._loop.get_debug():
+ logger.debug("%r resumes writing", self)
+
+ for waiter in self._drain_waiters:
+ if not waiter.done():
+ waiter.set_result(None)
+
+ def connection_lost(self, exc):
+ self._connection_lost = True
+ # Wake up the writer(s) if currently paused.
+ if not self._paused:
+ return
+
+ for waiter in self._drain_waiters:
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ async def _drain_helper(self):
+ if self._connection_lost:
+ raise ConnectionResetError('Connection lost')
+ if not self._paused:
+ return
+ waiter = self._loop.create_future()
+ self._drain_waiters.append(waiter)
+ try:
+ await waiter
+ finally:
+ self._drain_waiters.remove(waiter)
+
+ def _get_close_waiter(self, stream):
+ raise NotImplementedError
+
+
+class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
+ """Helper class to adapt between Protocol and StreamReader.
+
+ (This is a helper class instead of making StreamReader itself a
+ Protocol subclass, because the StreamReader has other potential
+ uses, and to prevent the user of the StreamReader to accidentally
+ call inappropriate methods of the protocol.)
+ """
+
+ _source_traceback = None
+
+ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
+ super().__init__(loop=loop)
+ if stream_reader is not None:
+ self._stream_reader_wr = weakref.ref(stream_reader)
+ self._source_traceback = stream_reader._source_traceback
+ else:
+ self._stream_reader_wr = None
+ if client_connected_cb is not None:
+ # This is a stream created by the `create_server()` function.
+ # Keep a strong reference to the reader until a connection
+ # is established.
+ self._strong_reader = stream_reader
+ self._reject_connection = False
+ self._stream_writer = None
+ self._task = None
+ self._transport = None
+ self._client_connected_cb = client_connected_cb
+ self._over_ssl = False
+ self._closed = self._loop.create_future()
+
+ @property
+ def _stream_reader(self):
+ if self._stream_reader_wr is None:
+ return None
+ return self._stream_reader_wr()
+
+ def _replace_writer(self, writer):
+ loop = self._loop
+ transport = writer.transport
+ self._stream_writer = writer
+ self._transport = transport
+ self._over_ssl = transport.get_extra_info('sslcontext') is not None
+
+ def connection_made(self, transport):
+ if self._reject_connection:
+ context = {
+ 'message': ('An open stream was garbage collected prior to '
+ 'establishing network connection; '
+ 'call "stream.close()" explicitly.')
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ transport.abort()
+ return
+ self._transport = transport
+ reader = self._stream_reader
+ if reader is not None:
+ reader.set_transport(transport)
+ self._over_ssl = transport.get_extra_info('sslcontext') is not None
+ if self._client_connected_cb is not None:
+ self._stream_writer = StreamWriter(transport, self,
+ reader,
+ self._loop)
+ res = self._client_connected_cb(reader,
+ self._stream_writer)
+ if coroutines.iscoroutine(res):
+ self._task = self._loop.create_task(res)
+ self._strong_reader = None
+
+ def connection_lost(self, exc):
+ reader = self._stream_reader
+ if reader is not None:
+ if exc is None:
+ reader.feed_eof()
+ else:
+ reader.set_exception(exc)
+ if not self._closed.done():
+ if exc is None:
+ self._closed.set_result(None)
+ else:
+ self._closed.set_exception(exc)
+ super().connection_lost(exc)
+ self._stream_reader_wr = None
+ self._stream_writer = None
+ self._task = None
+ self._transport = None
+
+ def data_received(self, data):
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_data(data)
+
+ def eof_received(self):
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_eof()
+ if self._over_ssl:
+ # Prevent a warning in SSLProtocol.eof_received:
+ # "returning true from eof_received()
+ # has no effect when using ssl"
+ return False
+ return True
+
+ def _get_close_waiter(self, stream):
+ return self._closed
+
+ def __del__(self):
+ # Prevent reports about unhandled exceptions.
+ # Better than self._closed._log_traceback = False hack
+ try:
+ closed = self._closed
+ except AttributeError:
+ pass # failed constructor
+ else:
+ if closed.done() and not closed.cancelled():
+ closed.exception()
+
+
+class StreamWriter:
+ """Wraps a Transport.
+
+ This exposes write(), writelines(), [can_]write_eof(),
+ get_extra_info() and close(). It adds drain() which returns an
+ optional Future on which you can wait for flow control. It also
+ adds a transport property which references the Transport
+ directly.
+ """
+
+ def __init__(self, transport, protocol, reader, loop):
+ self._transport = transport
+ self._protocol = protocol
+ # drain() expects that the reader has an exception() method
+ assert reader is None or isinstance(reader, StreamReader)
+ self._reader = reader
+ self._loop = loop
+ self._complete_fut = self._loop.create_future()
+ self._complete_fut.set_result(None)
+
+ def __repr__(self):
+ info = [self.__class__.__name__, f'transport={self._transport!r}']
+ if self._reader is not None:
+ info.append(f'reader={self._reader!r}')
+ return '<{}>'.format(' '.join(info))
+
+ @property
+ def transport(self):
+ return self._transport
+
+ def write(self, data):
+ self._transport.write(data)
+
+ def writelines(self, data):
+ self._transport.writelines(data)
+
+ def write_eof(self):
+ return self._transport.write_eof()
+
+ def can_write_eof(self):
+ return self._transport.can_write_eof()
+
+ def close(self):
+ return self._transport.close()
+
+ def is_closing(self):
+ return self._transport.is_closing()
+
+ async def wait_closed(self):
+ await self._protocol._get_close_waiter(self)
+
+ def get_extra_info(self, name, default=None):
+ return self._transport.get_extra_info(name, default)
+
+ async def drain(self):
+ """Flush the write buffer.
+
+ The intended use is to write
+
+ w.write(data)
+ await w.drain()
+ """
+ if self._reader is not None:
+ exc = self._reader.exception()
+ if exc is not None:
+ raise exc
+ if self._transport.is_closing():
+ # Wait for protocol.connection_lost() call
+ # Raise connection closing error if any,
+ # ConnectionResetError otherwise
+ # Yield to the event loop so connection_lost() may be
+ # called. Without this, _drain_helper() would return
+ # immediately, and code that calls
+ # write(...); await drain()
+ # in a loop would never call connection_lost(), so it
+ # would not see an error when the socket is closed.
+ await sleep(0)
+ await self._protocol._drain_helper()
+
+ async def start_tls(self, sslcontext, *,
+ server_hostname=None,
+ ssl_handshake_timeout=None):
+ """Upgrade an existing stream-based connection to TLS."""
+ server_side = self._protocol._client_connected_cb is not None
+ protocol = self._protocol
+ await self.drain()
+ new_transport = await self._loop.start_tls( # type: ignore
+ self._transport, protocol, sslcontext,
+ server_side=server_side, server_hostname=server_hostname,
+ ssl_handshake_timeout=ssl_handshake_timeout)
+ self._transport = new_transport
+ protocol._replace_writer(self)
+
+ def __del__(self):
+ if not self._transport.is_closing():
+ self.close()
+
+
+class StreamReader:
+
+ _source_traceback = None
+
+ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
+ # The line length limit is a security feature;
+ # it also doubles as half the buffer limit.
+
+ if limit <= 0:
+ raise ValueError('Limit cannot be <= 0')
+
+ self._limit = limit
+ if loop is None:
+ self._loop = events._get_event_loop()
+ else:
+ self._loop = loop
+ self._buffer = bytearray()
+ self._eof = False # Whether we're done.
+ self._waiter = None # A future used by _wait_for_data()
+ self._exception = None
+ self._transport = None
+ self._paused = False
+ if self._loop.get_debug():
+ self._source_traceback = format_helpers.extract_stack(
+ sys._getframe(1))
+
+ def __repr__(self):
+ info = ['StreamReader']
+ if self._buffer:
+ info.append(f'{len(self._buffer)} bytes')
+ if self._eof:
+ info.append('eof')
+ if self._limit != _DEFAULT_LIMIT:
+ info.append(f'limit={self._limit}')
+ if self._waiter:
+ info.append(f'waiter={self._waiter!r}')
+ if self._exception:
+ info.append(f'exception={self._exception!r}')
+ if self._transport:
+ info.append(f'transport={self._transport!r}')
+ if self._paused:
+ info.append('paused')
+ return '<{}>'.format(' '.join(info))
+
+ def exception(self):
+ return self._exception
+
+ def set_exception(self, exc):
+ self._exception = exc
+
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_exception(exc)
+
+ def _wakeup_waiter(self):
+ """Wakeup read*() functions waiting for data or EOF."""
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(None)
+
+ def set_transport(self, transport):
+ assert self._transport is None, 'Transport already set'
+ self._transport = transport
+
+ def _maybe_resume_transport(self):
+ if self._paused and len(self._buffer) <= self._limit:
+ self._paused = False
+ self._transport.resume_reading()
+
+ def feed_eof(self):
+ self._eof = True
+ self._wakeup_waiter()
+
+ def at_eof(self):
+ """Return True if the buffer is empty and 'feed_eof' was called."""
+ return self._eof and not self._buffer
+
+ def feed_data(self, data):
+ assert not self._eof, 'feed_data after feed_eof'
+
+ if not data:
+ return
+
+ self._buffer.extend(data)
+ self._wakeup_waiter()
+
+ if (self._transport is not None and
+ not self._paused and
+ len(self._buffer) > 2 * self._limit):
+ try:
+ self._transport.pause_reading()
+ except NotImplementedError:
+ # The transport can't be paused.
+ # We'll just have to buffer all data.
+ # Forget the transport so we don't keep trying.
+ self._transport = None
+ else:
+ self._paused = True
+
+ async def _wait_for_data(self, func_name):
+ """Wait until feed_data() or feed_eof() is called.
+
+ If stream was paused, automatically resume it.
+ """
+ # StreamReader uses a future to link the protocol feed_data() method
+ # to a read coroutine. Running two read coroutines at the same time
+ # would have an unexpected behaviour. It would not possible to know
+ # which coroutine would get the next data.
+ if self._waiter is not None:
+ raise RuntimeError(
+ f'{func_name}() called while another coroutine is '
+ f'already waiting for incoming data')
+
+ assert not self._eof, '_wait_for_data after EOF'
+
+ # Waiting for data while paused will make deadlock, so prevent it.
+ # This is essential for readexactly(n) for case when n > self._limit.
+ if self._paused:
+ self._paused = False
+ self._transport.resume_reading()
+
+ self._waiter = self._loop.create_future()
+ try:
+ await self._waiter
+ finally:
+ self._waiter = None
+
+ async def readline(self):
+ """Read chunk of data from the stream until newline (b'\n') is found.
+
+ On success, return chunk that ends with newline. If only partial
+ line can be read due to EOF, return incomplete line without
+ terminating newline. When EOF was reached while no bytes read, empty
+ bytes object is returned.
+
+ If limit is reached, ValueError will be raised. In that case, if
+ newline was found, complete line including newline will be removed
+ from internal buffer. Else, internal buffer will be cleared. Limit is
+ compared against part of the line without newline.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ sep = b'\n'
+ seplen = len(sep)
+ try:
+ line = await self.readuntil(sep)
+ except exceptions.IncompleteReadError as e:
+ return e.partial
+ except exceptions.LimitOverrunError as e:
+ if self._buffer.startswith(sep, e.consumed):
+ del self._buffer[:e.consumed + seplen]
+ else:
+ self._buffer.clear()
+ self._maybe_resume_transport()
+ raise ValueError(e.args[0])
+ return line
+
+ async def readuntil(self, separator=b'\n'):
+ """Read data from the stream until ``separator`` is found.
+
+ On success, the data and separator will be removed from the
+ internal buffer (consumed). Returned data will include the
+ separator at the end.
+
+ Configured stream limit is used to check result. Limit sets the
+ maximal length of data that can be returned, not counting the
+ separator.
+
+ If an EOF occurs and the complete separator is still not found,
+ an IncompleteReadError exception will be raised, and the internal
+ buffer will be reset. The IncompleteReadError.partial attribute
+ may contain the separator partially.
+
+ If the data cannot be read because of over limit, a
+ LimitOverrunError exception will be raised, and the data
+ will be left in the internal buffer, so it can be read again.
+ """
+ seplen = len(separator)
+ if seplen == 0:
+ raise ValueError('Separator should be at least one-byte string')
+
+ if self._exception is not None:
+ raise self._exception
+
+ # Consume whole buffer except last bytes, which length is
+ # one less than seplen. Let's check corner cases with
+ # separator='SEPARATOR':
+ # * we have received almost complete separator (without last
+ # byte). i.e buffer='some textSEPARATO'. In this case we
+ # can safely consume len(separator) - 1 bytes.
+ # * last byte of buffer is first byte of separator, i.e.
+ # buffer='abcdefghijklmnopqrS'. We may safely consume
+ # everything except that last byte, but this require to
+ # analyze bytes of buffer that match partial separator.
+ # This is slow and/or require FSM. For this case our
+ # implementation is not optimal, since require rescanning
+ # of data that is known to not belong to separator. In
+ # real world, separator will not be so long to notice
+ # performance problems. Even when reading MIME-encoded
+ # messages :)
+
+ # `offset` is the number of bytes from the beginning of the buffer
+ # where there is no occurrence of `separator`.
+ offset = 0
+
+ # Loop until we find `separator` in the buffer, exceed the buffer size,
+ # or an EOF has happened.
+ while True:
+ buflen = len(self._buffer)
+
+ # Check if we now have enough data in the buffer for `separator` to
+ # fit.
+ if buflen - offset >= seplen:
+ isep = self._buffer.find(separator, offset)
+
+ if isep != -1:
+ # `separator` is in the buffer. `isep` will be used later
+ # to retrieve the data.
+ break
+
+ # see upper comment for explanation.
+ offset = buflen + 1 - seplen
+ if offset > self._limit:
+ raise exceptions.LimitOverrunError(
+ 'Separator is not found, and chunk exceed the limit',
+ offset)
+
+ # Complete message (with full separator) may be present in buffer
+ # even when EOF flag is set. This may happen when the last chunk
+ # adds data which makes separator be found. That's why we check for
+ # EOF *ater* inspecting the buffer.
+ if self._eof:
+ chunk = bytes(self._buffer)
+ self._buffer.clear()
+ raise exceptions.IncompleteReadError(chunk, None)
+
+ # _wait_for_data() will resume reading if stream was paused.
+ await self._wait_for_data('readuntil')
+
+ if isep > self._limit:
+ raise exceptions.LimitOverrunError(
+ 'Separator is found, but chunk is longer than limit', isep)
+
+ chunk = self._buffer[:isep + seplen]
+ del self._buffer[:isep + seplen]
+ self._maybe_resume_transport()
+ return bytes(chunk)
+
+ async def read(self, n=-1):
+ """Read up to `n` bytes from the stream.
+
+ If `n` is not provided or set to -1,
+ read until EOF, then return all read bytes.
+ If EOF was received and the internal buffer is empty,
+ return an empty bytes object.
+
+ If `n` is 0, return an empty bytes object immediately.
+
+ If `n` is positive, return at most `n` available bytes
+ as soon as at least 1 byte is available in the internal buffer.
+ If EOF is received before any byte is read, return an empty
+ bytes object.
+
+ Returned value is not limited with limit, configured at stream
+ creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+
+ if self._exception is not None:
+ raise self._exception
+
+ if n == 0:
+ return b''
+
+ if n < 0:
+ # This used to just loop creating a new waiter hoping to
+ # collect everything in self._buffer, but that would
+ # deadlock if the subprocess sends more than self.limit
+ # bytes. So just call self.read(self._limit) until EOF.
+ blocks = []
+ while True:
+ block = await self.read(self._limit)
+ if not block:
+ break
+ blocks.append(block)
+ return b''.join(blocks)
+
+ if not self._buffer and not self._eof:
+ await self._wait_for_data('read')
+
+ # This will work right even if buffer is less than n bytes
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+
+ self._maybe_resume_transport()
+ return data
+
+ async def readexactly(self, n):
+ """Read exactly `n` bytes.
+
+ Raise an IncompleteReadError if EOF is reached before `n` bytes can be
+ read. The IncompleteReadError.partial attribute of the exception will
+ contain the partial read bytes.
+
+ if n is zero, return empty bytes object.
+
+ Returned value is not limited with limit, configured at stream
+ creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ if n < 0:
+ raise ValueError('readexactly size can not be less than zero')
+
+ if self._exception is not None:
+ raise self._exception
+
+ if n == 0:
+ return b''
+
+ while len(self._buffer) < n:
+ if self._eof:
+ incomplete = bytes(self._buffer)
+ self._buffer.clear()
+ raise exceptions.IncompleteReadError(incomplete, n)
+
+ await self._wait_for_data('readexactly')
+
+ if len(self._buffer) == n:
+ data = bytes(self._buffer)
+ self._buffer.clear()
+ else:
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+ self._maybe_resume_transport()
+ return data
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ val = await self.readline()
+ if val == b'':
+ raise StopAsyncIteration
+ return val