aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrekby <rekby@ydb.tech>2023-03-23 11:08:52 +0300
committerrekby <rekby@ydb.tech>2023-03-23 11:08:52 +0300
commit224cb4a695bb3ddff3ceda4f75e6965e83c12e76 (patch)
treea770441fd74f7b6c3855c57ce92345b9864e21f2
parent427223b94850669746be1c38fdad7cbc4323d1bf (diff)
downloadydb-224cb4a695bb3ddff3ceda4f75e6965e83c12e76.tar.gz
update python ydb sdk to 3.0.1b13
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py10
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py6
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py92
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py9
-rw-r--r--ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py4
-rw-r--r--ydb/public/sdk/python3/ydb/connection.py7
-rw-r--r--ydb/public/sdk/python3/ydb/topic.py11
-rw-r--r--ydb/public/sdk/python3/ydb/ydb_version.py2
8 files changed, 123 insertions, 18 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
index d9cd87fc978..f95f7976e6f 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -61,11 +61,19 @@ class PublicAsyncIOReader:
_loop: asyncio.AbstractEventLoop
_closed: bool
_reconnector: ReaderReconnector
+ _parent: typing.Any # need for prevent close parent client by GC
- def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings):
+ def __init__(
+ self,
+ driver: Driver,
+ settings: topic_reader.PublicReaderSettings,
+ *,
+ _parent=None,
+ ):
self._loop = asyncio.get_running_loop()
self._closed = False
self._reconnector = ReaderReconnector(driver, settings)
+ self._parent = _parent
async def __aenter__(self):
return self
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
index ca6fde92861..28dfd004376 100644
--- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
+++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py
@@ -25,6 +25,7 @@ class TopicReaderSync:
_caller: CallFromSyncToAsync
_async_reader: PublicAsyncIOReader
_closed: bool
+ _parent: typing.Any # need for prevent stop the client by GC
def __init__(
self,
@@ -32,6 +33,7 @@ class TopicReaderSync:
settings: PublicReaderSettings,
*,
eventloop: Optional[asyncio.AbstractEventLoop] = None,
+ _parent=None, # need for prevent stop the client by GC
):
self._closed = False
@@ -49,6 +51,8 @@ class TopicReaderSync:
create_reader(), loop
).result()
+ self._parent = _parent
+
def __del__(self):
self.close(flush=False)
@@ -122,7 +126,7 @@ class TopicReaderSync:
"""
self._check_closed()
- self._caller.call_sync(self._async_reader.commit(mess))
+ self._caller.call_sync(lambda: self._async_reader.commit(mess))
def commit_with_ack(
self,
diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py
index b94ff46bec7..c24ee259ee2 100644
--- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py
+++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py
@@ -1,6 +1,7 @@
import concurrent.futures
import datetime
import enum
+import itertools
import uuid
from dataclasses import dataclass
from enum import Enum
@@ -12,6 +13,7 @@ import ydb.aio
from .._grpc.grpcwrapper.ydb_topic import StreamWriteMessage
from .._grpc.grpcwrapper.common_utils import IToProto
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
+from .. import connection
Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
@@ -200,14 +202,94 @@ def default_serializer_message_content(data: Any) -> bytes:
def messages_to_proto_requests(
messages: List[InternalMessage],
) -> List[StreamWriteMessage.FromClient]:
- # todo split by proto message size and codec
- res = []
- for msg in messages:
+
+ gropus = _slit_messages_for_send(messages)
+
+ res = [] # type: List[StreamWriteMessage.FromClient]
+ for group in gropus:
req = StreamWriteMessage.FromClient(
StreamWriteMessage.WriteRequest(
- messages=[msg.to_message_data()],
- codec=msg.codec,
+ messages=list(map(InternalMessage.to_message_data, group)),
+ codec=group[0].codec,
)
)
res.append(req)
return res
+
+
+_max_int = 2**63 - 1
+
+_message_data_overhead = (
+ StreamWriteMessage.FromClient(
+ StreamWriteMessage.WriteRequest(
+ messages=[
+ StreamWriteMessage.WriteRequest.MessageData(
+ seq_no=_max_int,
+ created_at=datetime.datetime(3000, 1, 1, 1, 1, 1, 1),
+ data=bytes(1),
+ uncompressed_size=_max_int,
+ partitioning=StreamWriteMessage.PartitioningMessageGroupID(
+ message_group_id="a" * 100,
+ ),
+ ),
+ ],
+ codec=20000,
+ )
+ )
+ .to_proto()
+ .ByteSize()
+)
+
+
+def _slit_messages_for_send(
+ messages: List[InternalMessage],
+) -> List[List[InternalMessage]]:
+ codec_groups = [] # type: List[List[InternalMessage]]
+ for _, messages in itertools.groupby(messages, lambda x: x.codec):
+ codec_groups.append(list(messages))
+
+ res = [] # type: List[List[InternalMessage]]
+ for codec_group in codec_groups:
+ group_by_size = _split_messages_by_size_with_default_overhead(codec_group)
+ res.extend(group_by_size)
+ return res
+
+
+def _split_messages_by_size_with_default_overhead(
+ messages: List[InternalMessage],
+) -> List[List[InternalMessage]]:
+ def get_message_size(msg: InternalMessage):
+ return len(msg.data) + _message_data_overhead
+
+ return _split_messages_by_size(
+ messages, connection._DEFAULT_MAX_GRPC_MESSAGE_SIZE, get_message_size
+ )
+
+
+def _split_messages_by_size(
+ messages: List[InternalMessage],
+ split_size: int,
+ get_msg_size: typing.Callable[[InternalMessage], int],
+) -> List[List[InternalMessage]]:
+ res = []
+ group = []
+ group_size = 0
+
+ for msg in messages:
+ msg_size = get_msg_size(msg)
+
+ if len(group) == 0:
+ group.append(msg)
+ group_size += msg_size
+ elif group_size + msg_size <= split_size:
+ group.append(msg)
+ group_size += msg_size
+ else:
+ res.append(group)
+ group = [msg]
+ group_size = msg_size
+
+ if len(group) > 0:
+ res.append(group)
+
+ return res
diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
index bc91312382f..481b797f65b 100644
--- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -47,13 +47,20 @@ class WriterAsyncIO:
_loop: asyncio.AbstractEventLoop
_reconnector: "WriterAsyncIOReconnector"
_closed: bool
+ _parent: typing.Any # need for prevent close parent client by GC
- def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings):
+ def __init__(
+ self,
+ driver: SupportedDriverType,
+ settings: PublicWriterSettings,
+ _client=None,
+ ):
self._loop = asyncio.get_running_loop()
self._closed = False
self._reconnector = WriterAsyncIOReconnector(
driver=driver, settings=WriterSettings(settings)
)
+ self._parent = _client
async def __aenter__(self) -> "WriterAsyncIO":
return self
diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py
index de0ec41d097..7eed3a47774 100644
--- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py
+++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
+import typing
from concurrent.futures import Future
from typing import Union, List, Optional
@@ -25,6 +26,7 @@ class WriterSync:
_caller: CallFromSyncToAsync
_async_writer: WriterAsyncIO
_closed: bool
+ _parent: typing.Any # need for prevent close parent client by GC
def __init__(
self,
@@ -32,6 +34,7 @@ class WriterSync:
settings: PublicWriterSettings,
*,
eventloop: Optional[asyncio.AbstractEventLoop] = None,
+ _parent=None,
):
self._closed = False
@@ -49,6 +52,7 @@ class WriterSync:
self._async_writer = self._caller.safe_call_with_result(
create_async_writer(), None
)
+ self._parent = _parent
def __enter__(self):
return self
diff --git a/ydb/public/sdk/python3/ydb/connection.py b/ydb/public/sdk/python3/ydb/connection.py
index 95db084a3cb..25a54bb7779 100644
--- a/ydb/public/sdk/python3/ydb/connection.py
+++ b/ydb/public/sdk/python3/ydb/connection.py
@@ -24,6 +24,8 @@ YDB_DATABASE_HEADER = "x-ydb-database"
YDB_TRACE_ID_HEADER = "x-ydb-trace-id"
YDB_REQUEST_TYPE_HEADER = "x-ydb-request-type"
+_DEFAULT_MAX_GRPC_MESSAGE_SIZE = 64 * 10**6
+
def _message_to_string(message):
"""
@@ -179,10 +181,9 @@ def _construct_channel_options(driver_config, endpoint_options=None):
:param endpoint_options: Endpoint options
:return: A channel initialization options
"""
- _max_message_size = 64 * 10**6
_default_connect_options = [
- ("grpc.max_receive_message_length", _max_message_size),
- ("grpc.max_send_message_length", _max_message_size),
+ ("grpc.max_receive_message_length", _DEFAULT_MAX_GRPC_MESSAGE_SIZE),
+ ("grpc.max_send_message_length", _DEFAULT_MAX_GRPC_MESSAGE_SIZE),
("grpc.primary_user_agent", driver_config.primary_user_agent),
(
"grpc.lb_policy_name",
diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py
index 7d983540f1c..c8c4ffef5a6 100644
--- a/ydb/public/sdk/python3/ydb/topic.py
+++ b/ydb/public/sdk/python3/ydb/topic.py
@@ -44,6 +44,7 @@ from ._topic_writer.topic_writer import ( # noqa: F401
RetryPolicy as TopicWriterRetryPolicy,
)
+from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter
from ._topic_common.common import (
@@ -51,8 +52,6 @@ from ._topic_common.common import (
create_result_wrapper as _create_result_wrapper,
)
-from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
-
from ._grpc.grpcwrapper import ydb_topic as _ydb_topic
from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types
from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401
@@ -174,7 +173,7 @@ class TopicClientAsyncIO:
settings = TopicReaderSettings(**args)
- return TopicReaderAsyncIO(self._driver, settings)
+ return TopicReaderAsyncIO(self._driver, settings, _parent=self)
def writer(
self,
@@ -201,7 +200,7 @@ class TopicClientAsyncIO:
if not settings.encoder_executor:
settings.encoder_executor = self._executor
- return TopicWriterAsyncIO(self._driver, settings)
+ return TopicWriterAsyncIO(self._driver, settings, _client=self)
def close(self):
if self._closed:
@@ -331,7 +330,7 @@ class TopicClient:
settings = TopicReaderSettings(**args)
- return TopicReader(self._driver, settings)
+ return TopicReader(self._driver, settings, _parent=self)
def writer(
self,
@@ -359,7 +358,7 @@ class TopicClient:
if not settings.encoder_executor:
settings.encoder_executor = self._executor
- return TopicWriter(self._driver, settings)
+ return TopicWriter(self._driver, settings, _parent=self)
def close(self):
if self._closed:
diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py
index 4d649d59b48..30904512e7a 100644
--- a/ydb/public/sdk/python3/ydb/ydb_version.py
+++ b/ydb/public/sdk/python3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.0.1b12"
+VERSION = "3.0.1b13"