aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/streams.py
diff options
context:
space:
mode:
authororivej <orivej@yandex-team.ru>2022-02-10 16:44:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:49 +0300
commit718c552901d703c502ccbefdfc3c9028d608b947 (patch)
tree46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/tools/python3/src/Lib/asyncio/streams.py
parente9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff)
downloadydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/streams.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/streams.py1292
1 files changed, 646 insertions, 646 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py
index 3c80bb8892..432c9d671f 100644
--- a/contrib/tools/python3/src/Lib/asyncio/streams.py
+++ b/contrib/tools/python3/src/Lib/asyncio/streams.py
@@ -1,218 +1,218 @@
-__all__ = (
- 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
+__all__ = (
+ 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server')
-
-import socket
+
+import socket
import sys
import warnings
import weakref
-
-if hasattr(socket, 'AF_UNIX'):
- __all__ += ('open_unix_connection', 'start_unix_server')
-
-from . import coroutines
-from . import events
+
+if hasattr(socket, 'AF_UNIX'):
+ __all__ += ('open_unix_connection', 'start_unix_server')
+
+from . import coroutines
+from . import events
from . import exceptions
from . import format_helpers
-from . import protocols
-from .log import logger
-from .tasks import sleep
-
-
-_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
-
-
-async def open_connection(host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
- """A wrapper for create_connection() returning a (reader, writer) pair.
-
- The reader returned is a StreamReader instance; the writer is a
- StreamWriter instance.
-
- The arguments are all the usual arguments to create_connection()
- except protocol_factory; most common are positional host and port,
- with various optional keyword arguments following.
-
- Additional optional keyword arguments are loop (to set the event loop
- instance to use) and limit (to set the buffer limit passed to the
- StreamReader).
-
- (If you want to customize the StreamReader and/or
- StreamReaderProtocol classes, just copy the code -- there's
- really nothing special here except some convenience.)
- """
- if loop is None:
- loop = events.get_event_loop()
+from . import protocols
+from .log import logger
+from .tasks import sleep
+
+
+_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
+
+
+async def open_connection(host=None, port=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ """A wrapper for create_connection() returning a (reader, writer) pair.
+
+ The reader returned is a StreamReader instance; the writer is a
+ StreamWriter instance.
+
+ The arguments are all the usual arguments to create_connection()
+ except protocol_factory; most common are positional host and port,
+ with various optional keyword arguments following.
+
+ Additional optional keyword arguments are loop (to set the event loop
+ instance to use) and limit (to set the buffer limit passed to the
+ StreamReader).
+
+ (If you want to customize the StreamReader and/or
+ StreamReaderProtocol classes, just copy the code -- there's
+ really nothing special here except some convenience.)
+ """
+ if loop is None:
+ loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
- reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, loop=loop)
- transport, _ = await loop.create_connection(
- lambda: protocol, host, port, **kwds)
- writer = StreamWriter(transport, protocol, reader, loop)
- return reader, writer
-
-
-async def start_server(client_connected_cb, host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
- """Start a socket server, call back for each client connected.
-
- The first parameter, `client_connected_cb`, takes two parameters:
- client_reader, client_writer. client_reader is a StreamReader
- object, while client_writer is a StreamWriter object. This
- parameter can either be a plain callback function or a coroutine;
- if it is a coroutine, it will be automatically converted into a
- Task.
-
- The rest of the arguments are all the usual arguments to
- loop.create_server() except protocol_factory; most common are
- positional host and port, with various optional keyword arguments
- following. The return value is the same as loop.create_server().
-
- Additional optional keyword arguments are loop (to set the event loop
- instance to use) and limit (to set the buffer limit passed to the
- StreamReader).
-
- The return value is the same as loop.create_server(), i.e. a
- Server object which can be used to stop the service.
- """
- if loop is None:
- loop = events.get_event_loop()
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, loop=loop)
+ transport, _ = await loop.create_connection(
+ lambda: protocol, host, port, **kwds)
+ writer = StreamWriter(transport, protocol, reader, loop)
+ return reader, writer
+
+
+async def start_server(client_connected_cb, host=None, port=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ """Start a socket server, call back for each client connected.
+
+ The first parameter, `client_connected_cb`, takes two parameters:
+ client_reader, client_writer. client_reader is a StreamReader
+ object, while client_writer is a StreamWriter object. This
+ parameter can either be a plain callback function or a coroutine;
+ if it is a coroutine, it will be automatically converted into a
+ Task.
+
+ The rest of the arguments are all the usual arguments to
+ loop.create_server() except protocol_factory; most common are
+ positional host and port, with various optional keyword arguments
+ following. The return value is the same as loop.create_server().
+
+ Additional optional keyword arguments are loop (to set the event loop
+ instance to use) and limit (to set the buffer limit passed to the
+ StreamReader).
+
+ The return value is the same as loop.create_server(), i.e. a
+ Server object which can be used to stop the service.
+ """
+ if loop is None:
+ loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
-
- def factory():
- reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, client_connected_cb,
- loop=loop)
- return protocol
-
- return await loop.create_server(factory, host, port, **kwds)
-
-
-if hasattr(socket, 'AF_UNIX'):
- # UNIX Domain Sockets are supported on this platform
-
- async def open_unix_connection(path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
- """Similar to `open_connection` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
+
+ def factory():
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, client_connected_cb,
+ loop=loop)
+ return protocol
+
+ return await loop.create_server(factory, host, port, **kwds)
+
+
+if hasattr(socket, 'AF_UNIX'):
+ # UNIX Domain Sockets are supported on this platform
+
+ async def open_unix_connection(path=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ """Similar to `open_connection` but works with UNIX Domain Sockets."""
+ if loop is None:
+ loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
- reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, loop=loop)
- transport, _ = await loop.create_unix_connection(
- lambda: protocol, path, **kwds)
- writer = StreamWriter(transport, protocol, reader, loop)
- return reader, writer
-
- async def start_unix_server(client_connected_cb, path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
- """Similar to `start_server` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, loop=loop)
+ transport, _ = await loop.create_unix_connection(
+ lambda: protocol, path, **kwds)
+ writer = StreamWriter(transport, protocol, reader, loop)
+ return reader, writer
+
+ async def start_unix_server(client_connected_cb, path=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ """Similar to `start_server` but works with UNIX Domain Sockets."""
+ if loop is None:
+ loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
-
- def factory():
- reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, client_connected_cb,
- loop=loop)
- return protocol
-
- return await loop.create_unix_server(factory, path, **kwds)
-
-
-class FlowControlMixin(protocols.Protocol):
- """Reusable flow control logic for StreamWriter.drain().
-
- This implements the protocol methods pause_writing(),
- resume_writing() and connection_lost(). If the subclass overrides
- these it must call the super methods.
-
- StreamWriter.drain() must wait for _drain_helper() coroutine.
- """
-
- def __init__(self, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- self._paused = False
- self._drain_waiter = None
- self._connection_lost = False
-
- def pause_writing(self):
- assert not self._paused
- self._paused = True
- if self._loop.get_debug():
- logger.debug("%r pauses writing", self)
-
- def resume_writing(self):
- assert self._paused
- self._paused = False
- if self._loop.get_debug():
- logger.debug("%r resumes writing", self)
-
- waiter = self._drain_waiter
- if waiter is not None:
- self._drain_waiter = None
- if not waiter.done():
- waiter.set_result(None)
-
- def connection_lost(self, exc):
- self._connection_lost = True
- # Wake up the writer if currently paused.
- if not self._paused:
- return
- waiter = self._drain_waiter
- if waiter is None:
- return
- self._drain_waiter = None
- if waiter.done():
- return
- if exc is None:
- waiter.set_result(None)
- else:
- waiter.set_exception(exc)
-
- async def _drain_helper(self):
- if self._connection_lost:
- raise ConnectionResetError('Connection lost')
- if not self._paused:
- return
- waiter = self._drain_waiter
- assert waiter is None or waiter.cancelled()
- waiter = self._loop.create_future()
- self._drain_waiter = waiter
- await waiter
-
+
+ def factory():
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader, client_connected_cb,
+ loop=loop)
+ return protocol
+
+ return await loop.create_unix_server(factory, path, **kwds)
+
+
+class FlowControlMixin(protocols.Protocol):
+ """Reusable flow control logic for StreamWriter.drain().
+
+ This implements the protocol methods pause_writing(),
+ resume_writing() and connection_lost(). If the subclass overrides
+ these it must call the super methods.
+
+ StreamWriter.drain() must wait for _drain_helper() coroutine.
+ """
+
+ def __init__(self, loop=None):
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
+ self._loop = loop
+ self._paused = False
+ self._drain_waiter = None
+ self._connection_lost = False
+
+ def pause_writing(self):
+ assert not self._paused
+ self._paused = True
+ if self._loop.get_debug():
+ logger.debug("%r pauses writing", self)
+
+ def resume_writing(self):
+ assert self._paused
+ self._paused = False
+ if self._loop.get_debug():
+ logger.debug("%r resumes writing", self)
+
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ waiter.set_result(None)
+
+ def connection_lost(self, exc):
+ self._connection_lost = True
+ # Wake up the writer if currently paused.
+ if not self._paused:
+ return
+ waiter = self._drain_waiter
+ if waiter is None:
+ return
+ self._drain_waiter = None
+ if waiter.done():
+ return
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ async def _drain_helper(self):
+ if self._connection_lost:
+ raise ConnectionResetError('Connection lost')
+ if not self._paused:
+ return
+ waiter = self._drain_waiter
+ assert waiter is None or waiter.cancelled()
+ waiter = self._loop.create_future()
+ self._drain_waiter = waiter
+ await waiter
+
def _get_close_waiter(self, stream):
raise NotImplementedError
-
-
-class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
- """Helper class to adapt between Protocol and StreamReader.
-
- (This is a helper class instead of making StreamReader itself a
- Protocol subclass, because the StreamReader has other potential
- uses, and to prevent the user of the StreamReader to accidentally
- call inappropriate methods of the protocol.)
- """
-
+
+
+class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
+ """Helper class to adapt between Protocol and StreamReader.
+
+ (This is a helper class instead of making StreamReader itself a
+ Protocol subclass, because the StreamReader has other potential
+ uses, and to prevent the user of the StreamReader to accidentally
+ call inappropriate methods of the protocol.)
+ """
+
_source_traceback = None
- def __init__(self, stream_reader, client_connected_cb=None, loop=None):
- super().__init__(loop=loop)
+ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
+ super().__init__(loop=loop)
if stream_reader is not None:
self._stream_reader_wr = weakref.ref(stream_reader)
self._source_traceback = stream_reader._source_traceback
@@ -224,19 +224,19 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
# is established.
self._strong_reader = stream_reader
self._reject_connection = False
- self._stream_writer = None
+ self._stream_writer = None
self._transport = None
- self._client_connected_cb = client_connected_cb
- self._over_ssl = False
- self._closed = self._loop.create_future()
-
+ self._client_connected_cb = client_connected_cb
+ self._over_ssl = False
+ self._closed = self._loop.create_future()
+
@property
def _stream_reader(self):
if self._stream_reader_wr is None:
return None
return self._stream_reader_wr()
- def connection_made(self, transport):
+ def connection_made(self, transport):
if self._reject_connection:
context = {
'message': ('An open stream was garbage collected prior to '
@@ -252,490 +252,490 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
reader = self._stream_reader
if reader is not None:
reader.set_transport(transport)
- self._over_ssl = transport.get_extra_info('sslcontext') is not None
- if self._client_connected_cb is not None:
- self._stream_writer = StreamWriter(transport, self,
+ self._over_ssl = transport.get_extra_info('sslcontext') is not None
+ if self._client_connected_cb is not None:
+ self._stream_writer = StreamWriter(transport, self,
reader,
- self._loop)
+ self._loop)
res = self._client_connected_cb(reader,
- self._stream_writer)
- if coroutines.iscoroutine(res):
- self._loop.create_task(res)
+ self._stream_writer)
+ if coroutines.iscoroutine(res):
+ self._loop.create_task(res)
self._strong_reader = None
-
- def connection_lost(self, exc):
+
+ def connection_lost(self, exc):
reader = self._stream_reader
if reader is not None:
- if exc is None:
+ if exc is None:
reader.feed_eof()
- else:
+ else:
reader.set_exception(exc)
- if not self._closed.done():
- if exc is None:
- self._closed.set_result(None)
- else:
- self._closed.set_exception(exc)
- super().connection_lost(exc)
+ if not self._closed.done():
+ if exc is None:
+ self._closed.set_result(None)
+ else:
+ self._closed.set_exception(exc)
+ super().connection_lost(exc)
self._stream_reader_wr = None
- self._stream_writer = None
+ self._stream_writer = None
self._transport = None
-
- def data_received(self, data):
+
+ def data_received(self, data):
reader = self._stream_reader
if reader is not None:
reader.feed_data(data)
-
- def eof_received(self):
+
+ def eof_received(self):
reader = self._stream_reader
if reader is not None:
reader.feed_eof()
- if self._over_ssl:
- # Prevent a warning in SSLProtocol.eof_received:
- # "returning true from eof_received()
- # has no effect when using ssl"
- return False
- return True
-
+ if self._over_ssl:
+ # Prevent a warning in SSLProtocol.eof_received:
+ # "returning true from eof_received()
+ # has no effect when using ssl"
+ return False
+ return True
+
def _get_close_waiter(self, stream):
return self._closed
- def __del__(self):
- # Prevent reports about unhandled exceptions.
- # Better than self._closed._log_traceback = False hack
- closed = self._closed
- if closed.done() and not closed.cancelled():
- closed.exception()
-
-
-class StreamWriter:
- """Wraps a Transport.
-
- This exposes write(), writelines(), [can_]write_eof(),
- get_extra_info() and close(). It adds drain() which returns an
- optional Future on which you can wait for flow control. It also
- adds a transport property which references the Transport
- directly.
- """
-
- def __init__(self, transport, protocol, reader, loop):
- self._transport = transport
- self._protocol = protocol
- # drain() expects that the reader has an exception() method
- assert reader is None or isinstance(reader, StreamReader)
- self._reader = reader
- self._loop = loop
+ def __del__(self):
+ # Prevent reports about unhandled exceptions.
+ # Better than self._closed._log_traceback = False hack
+ closed = self._closed
+ if closed.done() and not closed.cancelled():
+ closed.exception()
+
+
+class StreamWriter:
+ """Wraps a Transport.
+
+ This exposes write(), writelines(), [can_]write_eof(),
+ get_extra_info() and close(). It adds drain() which returns an
+ optional Future on which you can wait for flow control. It also
+ adds a transport property which references the Transport
+ directly.
+ """
+
+ def __init__(self, transport, protocol, reader, loop):
+ self._transport = transport
+ self._protocol = protocol
+ # drain() expects that the reader has an exception() method
+ assert reader is None or isinstance(reader, StreamReader)
+ self._reader = reader
+ self._loop = loop
self._complete_fut = self._loop.create_future()
self._complete_fut.set_result(None)
-
- def __repr__(self):
- info = [self.__class__.__name__, f'transport={self._transport!r}']
- if self._reader is not None:
- info.append(f'reader={self._reader!r}')
- return '<{}>'.format(' '.join(info))
-
- @property
- def transport(self):
- return self._transport
-
- def write(self, data):
- self._transport.write(data)
-
- def writelines(self, data):
- self._transport.writelines(data)
-
- def write_eof(self):
- return self._transport.write_eof()
-
- def can_write_eof(self):
- return self._transport.can_write_eof()
-
- def close(self):
- return self._transport.close()
-
- def is_closing(self):
- return self._transport.is_closing()
-
- async def wait_closed(self):
+
+ def __repr__(self):
+ info = [self.__class__.__name__, f'transport={self._transport!r}']
+ if self._reader is not None:
+ info.append(f'reader={self._reader!r}')
+ return '<{}>'.format(' '.join(info))
+
+ @property
+ def transport(self):
+ return self._transport
+
+ def write(self, data):
+ self._transport.write(data)
+
+ def writelines(self, data):
+ self._transport.writelines(data)
+
+ def write_eof(self):
+ return self._transport.write_eof()
+
+ def can_write_eof(self):
+ return self._transport.can_write_eof()
+
+ def close(self):
+ return self._transport.close()
+
+ def is_closing(self):
+ return self._transport.is_closing()
+
+ async def wait_closed(self):
await self._protocol._get_close_waiter(self)
-
- def get_extra_info(self, name, default=None):
- return self._transport.get_extra_info(name, default)
-
- async def drain(self):
- """Flush the write buffer.
-
- The intended use is to write
-
- w.write(data)
- await w.drain()
- """
- if self._reader is not None:
- exc = self._reader.exception()
- if exc is not None:
- raise exc
- if self._transport.is_closing():
+
+ def get_extra_info(self, name, default=None):
+ return self._transport.get_extra_info(name, default)
+
+ async def drain(self):
+ """Flush the write buffer.
+
+ The intended use is to write
+
+ w.write(data)
+ await w.drain()
+ """
+ if self._reader is not None:
+ exc = self._reader.exception()
+ if exc is not None:
+ raise exc
+ if self._transport.is_closing():
# Wait for protocol.connection_lost() call
# Raise connection closing error if any,
# ConnectionResetError otherwise
- # Yield to the event loop so connection_lost() may be
- # called. Without this, _drain_helper() would return
- # immediately, and code that calls
- # write(...); await drain()
- # in a loop would never call connection_lost(), so it
- # would not see an error when the socket is closed.
+ # Yield to the event loop so connection_lost() may be
+ # called. Without this, _drain_helper() would return
+ # immediately, and code that calls
+ # write(...); await drain()
+ # in a loop would never call connection_lost(), so it
+ # would not see an error when the socket is closed.
await sleep(0)
- await self._protocol._drain_helper()
-
-
-class StreamReader:
-
+ await self._protocol._drain_helper()
+
+
+class StreamReader:
+
_source_traceback = None
- def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
- # The line length limit is a security feature;
- # it also doubles as half the buffer limit.
-
- if limit <= 0:
- raise ValueError('Limit cannot be <= 0')
-
- self._limit = limit
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- self._buffer = bytearray()
- self._eof = False # Whether we're done.
- self._waiter = None # A future used by _wait_for_data()
- self._exception = None
- self._transport = None
- self._paused = False
+ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
+ # The line length limit is a security feature;
+ # it also doubles as half the buffer limit.
+
+ if limit <= 0:
+ raise ValueError('Limit cannot be <= 0')
+
+ self._limit = limit
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
+ self._loop = loop
+ self._buffer = bytearray()
+ self._eof = False # Whether we're done.
+ self._waiter = None # A future used by _wait_for_data()
+ self._exception = None
+ self._transport = None
+ self._paused = False
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
-
- def __repr__(self):
- info = ['StreamReader']
- if self._buffer:
- info.append(f'{len(self._buffer)} bytes')
- if self._eof:
- info.append('eof')
- if self._limit != _DEFAULT_LIMIT:
- info.append(f'limit={self._limit}')
- if self._waiter:
- info.append(f'waiter={self._waiter!r}')
- if self._exception:
- info.append(f'exception={self._exception!r}')
- if self._transport:
- info.append(f'transport={self._transport!r}')
- if self._paused:
- info.append('paused')
- return '<{}>'.format(' '.join(info))
-
- def exception(self):
- return self._exception
-
- def set_exception(self, exc):
- self._exception = exc
-
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_exception(exc)
-
- def _wakeup_waiter(self):
- """Wakeup read*() functions waiting for data or EOF."""
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(None)
-
- def set_transport(self, transport):
- assert self._transport is None, 'Transport already set'
- self._transport = transport
-
- def _maybe_resume_transport(self):
- if self._paused and len(self._buffer) <= self._limit:
- self._paused = False
- self._transport.resume_reading()
-
- def feed_eof(self):
- self._eof = True
- self._wakeup_waiter()
-
- def at_eof(self):
- """Return True if the buffer is empty and 'feed_eof' was called."""
- return self._eof and not self._buffer
-
- def feed_data(self, data):
- assert not self._eof, 'feed_data after feed_eof'
-
- if not data:
- return
-
- self._buffer.extend(data)
- self._wakeup_waiter()
-
- if (self._transport is not None and
- not self._paused and
- len(self._buffer) > 2 * self._limit):
- try:
- self._transport.pause_reading()
- except NotImplementedError:
- # The transport can't be paused.
- # We'll just have to buffer all data.
- # Forget the transport so we don't keep trying.
- self._transport = None
- else:
- self._paused = True
-
- async def _wait_for_data(self, func_name):
- """Wait until feed_data() or feed_eof() is called.
-
- If stream was paused, automatically resume it.
- """
- # StreamReader uses a future to link the protocol feed_data() method
- # to a read coroutine. Running two read coroutines at the same time
- # would have an unexpected behaviour. It would not possible to know
- # which coroutine would get the next data.
- if self._waiter is not None:
- raise RuntimeError(
- f'{func_name}() called while another coroutine is '
- f'already waiting for incoming data')
-
- assert not self._eof, '_wait_for_data after EOF'
-
- # Waiting for data while paused will make deadlock, so prevent it.
- # This is essential for readexactly(n) for case when n > self._limit.
- if self._paused:
- self._paused = False
- self._transport.resume_reading()
-
- self._waiter = self._loop.create_future()
- try:
- await self._waiter
- finally:
- self._waiter = None
-
- async def readline(self):
- """Read chunk of data from the stream until newline (b'\n') is found.
-
- On success, return chunk that ends with newline. If only partial
- line can be read due to EOF, return incomplete line without
- terminating newline. When EOF was reached while no bytes read, empty
- bytes object is returned.
-
- If limit is reached, ValueError will be raised. In that case, if
- newline was found, complete line including newline will be removed
- from internal buffer. Else, internal buffer will be cleared. Limit is
- compared against part of the line without newline.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
- sep = b'\n'
- seplen = len(sep)
- try:
- line = await self.readuntil(sep)
+
+ def __repr__(self):
+ info = ['StreamReader']
+ if self._buffer:
+ info.append(f'{len(self._buffer)} bytes')
+ if self._eof:
+ info.append('eof')
+ if self._limit != _DEFAULT_LIMIT:
+ info.append(f'limit={self._limit}')
+ if self._waiter:
+ info.append(f'waiter={self._waiter!r}')
+ if self._exception:
+ info.append(f'exception={self._exception!r}')
+ if self._transport:
+ info.append(f'transport={self._transport!r}')
+ if self._paused:
+ info.append('paused')
+ return '<{}>'.format(' '.join(info))
+
+ def exception(self):
+ return self._exception
+
+ def set_exception(self, exc):
+ self._exception = exc
+
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_exception(exc)
+
+ def _wakeup_waiter(self):
+ """Wakeup read*() functions waiting for data or EOF."""
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(None)
+
+ def set_transport(self, transport):
+ assert self._transport is None, 'Transport already set'
+ self._transport = transport
+
+ def _maybe_resume_transport(self):
+ if self._paused and len(self._buffer) <= self._limit:
+ self._paused = False
+ self._transport.resume_reading()
+
+ def feed_eof(self):
+ self._eof = True
+ self._wakeup_waiter()
+
+ def at_eof(self):
+ """Return True if the buffer is empty and 'feed_eof' was called."""
+ return self._eof and not self._buffer
+
+ def feed_data(self, data):
+ assert not self._eof, 'feed_data after feed_eof'
+
+ if not data:
+ return
+
+ self._buffer.extend(data)
+ self._wakeup_waiter()
+
+ if (self._transport is not None and
+ not self._paused and
+ len(self._buffer) > 2 * self._limit):
+ try:
+ self._transport.pause_reading()
+ except NotImplementedError:
+ # The transport can't be paused.
+ # We'll just have to buffer all data.
+ # Forget the transport so we don't keep trying.
+ self._transport = None
+ else:
+ self._paused = True
+
+ async def _wait_for_data(self, func_name):
+ """Wait until feed_data() or feed_eof() is called.
+
+ If stream was paused, automatically resume it.
+ """
+ # StreamReader uses a future to link the protocol feed_data() method
+ # to a read coroutine. Running two read coroutines at the same time
+ # would have an unexpected behaviour. It would not possible to know
+ # which coroutine would get the next data.
+ if self._waiter is not None:
+ raise RuntimeError(
+ f'{func_name}() called while another coroutine is '
+ f'already waiting for incoming data')
+
+ assert not self._eof, '_wait_for_data after EOF'
+
+ # Waiting for data while paused will make deadlock, so prevent it.
+ # This is essential for readexactly(n) for case when n > self._limit.
+ if self._paused:
+ self._paused = False
+ self._transport.resume_reading()
+
+ self._waiter = self._loop.create_future()
+ try:
+ await self._waiter
+ finally:
+ self._waiter = None
+
+ async def readline(self):
+ """Read chunk of data from the stream until newline (b'\n') is found.
+
+ On success, return chunk that ends with newline. If only partial
+ line can be read due to EOF, return incomplete line without
+ terminating newline. When EOF was reached while no bytes read, empty
+ bytes object is returned.
+
+ If limit is reached, ValueError will be raised. In that case, if
+ newline was found, complete line including newline will be removed
+ from internal buffer. Else, internal buffer will be cleared. Limit is
+ compared against part of the line without newline.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ sep = b'\n'
+ seplen = len(sep)
+ try:
+ line = await self.readuntil(sep)
except exceptions.IncompleteReadError as e:
- return e.partial
+ return e.partial
except exceptions.LimitOverrunError as e:
- if self._buffer.startswith(sep, e.consumed):
- del self._buffer[:e.consumed + seplen]
- else:
- self._buffer.clear()
- self._maybe_resume_transport()
- raise ValueError(e.args[0])
- return line
-
- async def readuntil(self, separator=b'\n'):
- """Read data from the stream until ``separator`` is found.
-
- On success, the data and separator will be removed from the
- internal buffer (consumed). Returned data will include the
- separator at the end.
-
- Configured stream limit is used to check result. Limit sets the
- maximal length of data that can be returned, not counting the
- separator.
-
- If an EOF occurs and the complete separator is still not found,
- an IncompleteReadError exception will be raised, and the internal
- buffer will be reset. The IncompleteReadError.partial attribute
- may contain the separator partially.
-
- If the data cannot be read because of over limit, a
- LimitOverrunError exception will be raised, and the data
- will be left in the internal buffer, so it can be read again.
- """
- seplen = len(separator)
- if seplen == 0:
- raise ValueError('Separator should be at least one-byte string')
-
- if self._exception is not None:
- raise self._exception
-
- # Consume whole buffer except last bytes, which length is
- # one less than seplen. Let's check corner cases with
- # separator='SEPARATOR':
- # * we have received almost complete separator (without last
- # byte). i.e buffer='some textSEPARATO'. In this case we
- # can safely consume len(separator) - 1 bytes.
- # * last byte of buffer is first byte of separator, i.e.
- # buffer='abcdefghijklmnopqrS'. We may safely consume
- # everything except that last byte, but this require to
- # analyze bytes of buffer that match partial separator.
- # This is slow and/or require FSM. For this case our
- # implementation is not optimal, since require rescanning
- # of data that is known to not belong to separator. In
- # real world, separator will not be so long to notice
- # performance problems. Even when reading MIME-encoded
- # messages :)
-
- # `offset` is the number of bytes from the beginning of the buffer
- # where there is no occurrence of `separator`.
- offset = 0
-
- # Loop until we find `separator` in the buffer, exceed the buffer size,
- # or an EOF has happened.
- while True:
- buflen = len(self._buffer)
-
- # Check if we now have enough data in the buffer for `separator` to
- # fit.
- if buflen - offset >= seplen:
- isep = self._buffer.find(separator, offset)
-
- if isep != -1:
- # `separator` is in the buffer. `isep` will be used later
- # to retrieve the data.
- break
-
- # see upper comment for explanation.
- offset = buflen + 1 - seplen
- if offset > self._limit:
+ if self._buffer.startswith(sep, e.consumed):
+ del self._buffer[:e.consumed + seplen]
+ else:
+ self._buffer.clear()
+ self._maybe_resume_transport()
+ raise ValueError(e.args[0])
+ return line
+
+ async def readuntil(self, separator=b'\n'):
+ """Read data from the stream until ``separator`` is found.
+
+ On success, the data and separator will be removed from the
+ internal buffer (consumed). Returned data will include the
+ separator at the end.
+
+ Configured stream limit is used to check result. Limit sets the
+ maximal length of data that can be returned, not counting the
+ separator.
+
+ If an EOF occurs and the complete separator is still not found,
+ an IncompleteReadError exception will be raised, and the internal
+ buffer will be reset. The IncompleteReadError.partial attribute
+ may contain the separator partially.
+
+ If the data cannot be read because of over limit, a
+ LimitOverrunError exception will be raised, and the data
+ will be left in the internal buffer, so it can be read again.
+ """
+ seplen = len(separator)
+ if seplen == 0:
+ raise ValueError('Separator should be at least one-byte string')
+
+ if self._exception is not None:
+ raise self._exception
+
+ # Consume whole buffer except last bytes, which length is
+ # one less than seplen. Let's check corner cases with
+ # separator='SEPARATOR':
+ # * we have received almost complete separator (without last
+ # byte). i.e buffer='some textSEPARATO'. In this case we
+ # can safely consume len(separator) - 1 bytes.
+ # * last byte of buffer is first byte of separator, i.e.
+ # buffer='abcdefghijklmnopqrS'. We may safely consume
+ # everything except that last byte, but this require to
+ # analyze bytes of buffer that match partial separator.
+ # This is slow and/or require FSM. For this case our
+ # implementation is not optimal, since require rescanning
+ # of data that is known to not belong to separator. In
+ # real world, separator will not be so long to notice
+ # performance problems. Even when reading MIME-encoded
+ # messages :)
+
+ # `offset` is the number of bytes from the beginning of the buffer
+ # where there is no occurrence of `separator`.
+ offset = 0
+
+ # Loop until we find `separator` in the buffer, exceed the buffer size,
+ # or an EOF has happened.
+ while True:
+ buflen = len(self._buffer)
+
+ # Check if we now have enough data in the buffer for `separator` to
+ # fit.
+ if buflen - offset >= seplen:
+ isep = self._buffer.find(separator, offset)
+
+ if isep != -1:
+ # `separator` is in the buffer. `isep` will be used later
+ # to retrieve the data.
+ break
+
+ # see upper comment for explanation.
+ offset = buflen + 1 - seplen
+ if offset > self._limit:
raise exceptions.LimitOverrunError(
- 'Separator is not found, and chunk exceed the limit',
- offset)
-
- # Complete message (with full separator) may be present in buffer
- # even when EOF flag is set. This may happen when the last chunk
- # adds data which makes separator be found. That's why we check for
- # EOF *ater* inspecting the buffer.
- if self._eof:
- chunk = bytes(self._buffer)
- self._buffer.clear()
+ 'Separator is not found, and chunk exceed the limit',
+ offset)
+
+ # Complete message (with full separator) may be present in buffer
+ # even when EOF flag is set. This may happen when the last chunk
+ # adds data which makes separator be found. That's why we check for
+ # EOF *ater* inspecting the buffer.
+ if self._eof:
+ chunk = bytes(self._buffer)
+ self._buffer.clear()
raise exceptions.IncompleteReadError(chunk, None)
-
- # _wait_for_data() will resume reading if stream was paused.
- await self._wait_for_data('readuntil')
-
- if isep > self._limit:
+
+ # _wait_for_data() will resume reading if stream was paused.
+ await self._wait_for_data('readuntil')
+
+ if isep > self._limit:
raise exceptions.LimitOverrunError(
- 'Separator is found, but chunk is longer than limit', isep)
-
- chunk = self._buffer[:isep + seplen]
- del self._buffer[:isep + seplen]
- self._maybe_resume_transport()
- return bytes(chunk)
-
- async def read(self, n=-1):
- """Read up to `n` bytes from the stream.
-
- If n is not provided, or set to -1, read until EOF and return all read
- bytes. If the EOF was received and the internal buffer is empty, return
- an empty bytes object.
-
- If n is zero, return empty bytes object immediately.
-
- If n is positive, this function try to read `n` bytes, and may return
- less or equal bytes than requested, but at least one byte. If EOF was
- received before any byte is read, this function returns empty byte
- object.
-
- Returned value is not limited with limit, configured at stream
- creation.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
-
- if self._exception is not None:
- raise self._exception
-
- if n == 0:
- return b''
-
- if n < 0:
- # This used to just loop creating a new waiter hoping to
- # collect everything in self._buffer, but that would
- # deadlock if the subprocess sends more than self.limit
- # bytes. So just call self.read(self._limit) until EOF.
- blocks = []
- while True:
- block = await self.read(self._limit)
- if not block:
- break
- blocks.append(block)
- return b''.join(blocks)
-
- if not self._buffer and not self._eof:
- await self._wait_for_data('read')
-
- # This will work right even if buffer is less than n bytes
- data = bytes(self._buffer[:n])
- del self._buffer[:n]
-
- self._maybe_resume_transport()
- return data
-
- async def readexactly(self, n):
- """Read exactly `n` bytes.
-
- Raise an IncompleteReadError if EOF is reached before `n` bytes can be
- read. The IncompleteReadError.partial attribute of the exception will
- contain the partial read bytes.
-
- if n is zero, return empty bytes object.
-
- Returned value is not limited with limit, configured at stream
- creation.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
- if n < 0:
- raise ValueError('readexactly size can not be less than zero')
-
- if self._exception is not None:
- raise self._exception
-
- if n == 0:
- return b''
-
- while len(self._buffer) < n:
- if self._eof:
- incomplete = bytes(self._buffer)
- self._buffer.clear()
+ 'Separator is found, but chunk is longer than limit', isep)
+
+ chunk = self._buffer[:isep + seplen]
+ del self._buffer[:isep + seplen]
+ self._maybe_resume_transport()
+ return bytes(chunk)
+
+ async def read(self, n=-1):
+ """Read up to `n` bytes from the stream.
+
+ If n is not provided, or set to -1, read until EOF and return all read
+ bytes. If the EOF was received and the internal buffer is empty, return
+ an empty bytes object.
+
+ If n is zero, return empty bytes object immediately.
+
+ If n is positive, this function try to read `n` bytes, and may return
+ less or equal bytes than requested, but at least one byte. If EOF was
+ received before any byte is read, this function returns empty byte
+ object.
+
+ Returned value is not limited with limit, configured at stream
+ creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+
+ if self._exception is not None:
+ raise self._exception
+
+ if n == 0:
+ return b''
+
+ if n < 0:
+ # This used to just loop creating a new waiter hoping to
+ # collect everything in self._buffer, but that would
+ # deadlock if the subprocess sends more than self.limit
+ # bytes. So just call self.read(self._limit) until EOF.
+ blocks = []
+ while True:
+ block = await self.read(self._limit)
+ if not block:
+ break
+ blocks.append(block)
+ return b''.join(blocks)
+
+ if not self._buffer and not self._eof:
+ await self._wait_for_data('read')
+
+ # This will work right even if buffer is less than n bytes
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+
+ self._maybe_resume_transport()
+ return data
+
+ async def readexactly(self, n):
+ """Read exactly `n` bytes.
+
+ Raise an IncompleteReadError if EOF is reached before `n` bytes can be
+ read. The IncompleteReadError.partial attribute of the exception will
+ contain the partial read bytes.
+
+ if n is zero, return empty bytes object.
+
+ Returned value is not limited with limit, configured at stream
+ creation.
+
+ If stream was paused, this function will automatically resume it if
+ needed.
+ """
+ if n < 0:
+ raise ValueError('readexactly size can not be less than zero')
+
+ if self._exception is not None:
+ raise self._exception
+
+ if n == 0:
+ return b''
+
+ while len(self._buffer) < n:
+ if self._eof:
+ incomplete = bytes(self._buffer)
+ self._buffer.clear()
raise exceptions.IncompleteReadError(incomplete, n)
-
- await self._wait_for_data('readexactly')
-
- if len(self._buffer) == n:
- data = bytes(self._buffer)
- self._buffer.clear()
- else:
- data = bytes(self._buffer[:n])
- del self._buffer[:n]
- self._maybe_resume_transport()
- return data
-
- def __aiter__(self):
- return self
-
- async def __anext__(self):
- val = await self.readline()
- if val == b'':
- raise StopAsyncIteration
- return val
+
+ await self._wait_for_data('readexactly')
+
+ if len(self._buffer) == n:
+ data = bytes(self._buffer)
+ self._buffer.clear()
+ else:
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+ self._maybe_resume_transport()
+ return data
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ val = await self.readline()
+ if val == b'':
+ raise StopAsyncIteration
+ return val