diff options
| author | robot-piglet <[email protected]> | 2026-04-10 18:04:51 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2026-04-10 18:34:34 +0300 |
| commit | 26a9785973fa55a8db2df2b542cc3f7ace2cfbde (patch) | |
| tree | f24be67d4b9cec4f79a5f71ffaaf2fc82b9c54c4 /contrib/python | |
| parent | 8b9bd0a4fa6deee61c718d5868cdc8b08562064e (diff) | |
Intermediate changes
commit_hash:8101acd76d22f5edc5a11f1e5ab97182cffb083b
Diffstat (limited to 'contrib/python')
| -rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
| -rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
| -rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py | 5 | ||||
| -rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py | 20 | ||||
| -rw-r--r-- | contrib/python/ydb/py3/ydb/topic.py | 2 | ||||
| -rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
6 files changed, 24 insertions, 9 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 27f284ea7e7..e058dd2ebed 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.27.0 +Version: 3.28.0 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 03ea8ae1f1c..f94ac2c0cfc 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.27.0) +VERSION(3.28.0) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index 38ee1be6598..e4545f3a606 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -57,7 +57,12 @@ class PublicReaderSettings: update_token_interval: Union[int, float] = 3600 event_handler: Optional[EventHandler] = None + buffer_release_threshold: float = 0.5 + """Min fraction of buffer_size_bytes to accumulate before sending a new ReadRequest (0.0 = immediately after every batch).""" + def __post_init__(self): + if not (0.0 <= self.buffer_release_threshold <= 1.0): + raise ValueError("buffer_release_threshold must be in [0.0, 1.0], got %s" % self.buffer_release_threshold) # check possible create init message _ = self._init_message() diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 5f5ac7a4706..f4e5a4f88a5 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import concurrent.futures import gzip +import math import typing from asyncio import Task from collections import defaultdict, OrderedDict @@ -438,6 +439,8 @@ class ReaderStream: _background_tasks: Set[asyncio.Task] _partition_sessions: Dict[int, datatypes.PartitionSession] _buffer_size_bytes: int # use for init request, then for debug purposes only + _min_buffer_release_bytes: int + _pending_buffer_release_bytes: int _decode_executor: Optional[concurrent.futures.Executor] _decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes @@ -471,6 +474,8 @@ class ReaderStream: self._background_tasks = set() self._partition_sessions = dict() self._buffer_size_bytes = settings.buffer_size_bytes + self._min_buffer_release_bytes = math.ceil(settings.buffer_size_bytes * settings.buffer_release_threshold) + self._pending_buffer_release_bytes = 0 self._decode_executor = settings.decoder_executor self._decoders = {Codec.CODEC_GZIP: gzip.decompress} @@ -844,14 +849,17 @@ class ReaderStream: self._buffer_size_bytes -= bytes_size def _buffer_release_bytes(self, bytes_size): - self._buffer_size_bytes += bytes_size - self._stream.write( - StreamReadMessage.FromClient( - client_message=StreamReadMessage.ReadRequest( - bytes_size=bytes_size, + self._pending_buffer_release_bytes += bytes_size + if self._pending_buffer_release_bytes >= self._min_buffer_release_bytes: + self._buffer_size_bytes += self._pending_buffer_release_bytes + self._stream.write( + StreamReadMessage.FromClient( + client_message=StreamReadMessage.ReadRequest( + bytes_size=self._pending_buffer_release_bytes, + ) ) ) - ) + self._pending_buffer_release_bytes = 0 def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> typing.List[datatypes.PublicBatch]: batches: typing.List[datatypes.PublicBatch] = [] diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index 89caddc91e9..98859293753 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -294,6 +294,7 @@ class TopicClientAsyncIO: decoder_executor: Optional[concurrent.futures.Executor] = None, auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. event_handler: Optional[TopicReaderEvents.EventHandler] = None, + buffer_release_threshold: float = 0.5, ) -> TopicReaderAsyncIO: logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) @@ -629,6 +630,7 @@ class TopicClient: decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. event_handler: Optional[TopicReaderEvents.EventHandler] = None, + buffer_release_threshold: float = 0.5, ) -> TopicReader: logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) if not decoder_executor: diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 4daebf63f07..19ff2992179 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.27.0" +VERSION = "3.28.0" |
