summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py5
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py20
-rw-r--r--contrib/python/ydb/py3/ydb/topic.py2
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
6 files changed, 24 insertions, 9 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 27f284ea7e7..e058dd2ebed 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.27.0
+Version: 3.28.0
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index 03ea8ae1f1c..f94ac2c0cfc 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.27.0)
+VERSION(3.28.0)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
index 38ee1be6598..e4545f3a606 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
@@ -57,7 +57,12 @@ class PublicReaderSettings:
update_token_interval: Union[int, float] = 3600
event_handler: Optional[EventHandler] = None
+ buffer_release_threshold: float = 0.5
+ """Min fraction of buffer_size_bytes to accumulate before sending a new ReadRequest (0.0 = immediately after every batch)."""
+
def __post_init__(self):
+ if not (0.0 <= self.buffer_release_threshold <= 1.0):
+ raise ValueError("buffer_release_threshold must be in [0.0, 1.0], got %s" % self.buffer_release_threshold)
# check possible create init message
_ = self._init_message()
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index 5f5ac7a4706..f4e5a4f88a5 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
import concurrent.futures
import gzip
+import math
import typing
from asyncio import Task
from collections import defaultdict, OrderedDict
@@ -438,6 +439,8 @@ class ReaderStream:
_background_tasks: Set[asyncio.Task]
_partition_sessions: Dict[int, datatypes.PartitionSession]
_buffer_size_bytes: int # use for init request, then for debug purposes only
+ _min_buffer_release_bytes: int
+ _pending_buffer_release_bytes: int
_decode_executor: Optional[concurrent.futures.Executor]
_decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes
@@ -471,6 +474,8 @@ class ReaderStream:
self._background_tasks = set()
self._partition_sessions = dict()
self._buffer_size_bytes = settings.buffer_size_bytes
+ self._min_buffer_release_bytes = math.ceil(settings.buffer_size_bytes * settings.buffer_release_threshold)
+ self._pending_buffer_release_bytes = 0
self._decode_executor = settings.decoder_executor
self._decoders = {Codec.CODEC_GZIP: gzip.decompress}
@@ -844,14 +849,17 @@ class ReaderStream:
self._buffer_size_bytes -= bytes_size
def _buffer_release_bytes(self, bytes_size):
- self._buffer_size_bytes += bytes_size
- self._stream.write(
- StreamReadMessage.FromClient(
- client_message=StreamReadMessage.ReadRequest(
- bytes_size=bytes_size,
+ self._pending_buffer_release_bytes += bytes_size
+ if self._pending_buffer_release_bytes >= self._min_buffer_release_bytes:
+ self._buffer_size_bytes += self._pending_buffer_release_bytes
+ self._stream.write(
+ StreamReadMessage.FromClient(
+ client_message=StreamReadMessage.ReadRequest(
+ bytes_size=self._pending_buffer_release_bytes,
+ )
)
)
- )
+ self._pending_buffer_release_bytes = 0
def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> typing.List[datatypes.PublicBatch]:
batches: typing.List[datatypes.PublicBatch] = []
diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py
index 89caddc91e9..98859293753 100644
--- a/contrib/python/ydb/py3/ydb/topic.py
+++ b/contrib/python/ydb/py3/ydb/topic.py
@@ -294,6 +294,7 @@ class TopicClientAsyncIO:
decoder_executor: Optional[concurrent.futures.Executor] = None,
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
event_handler: Optional[TopicReaderEvents.EventHandler] = None,
+ buffer_release_threshold: float = 0.5,
) -> TopicReaderAsyncIO:
logger.debug("Create reader for topic=%s consumer=%s", topic, consumer)
@@ -629,6 +630,7 @@ class TopicClient:
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
event_handler: Optional[TopicReaderEvents.EventHandler] = None,
+ buffer_release_threshold: float = 0.5,
) -> TopicReader:
logger.debug("Create reader for topic=%s consumer=%s", topic, consumer)
if not decoder_executor:
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 4daebf63f07..19ff2992179 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.27.0"
+VERSION = "3.28.0"