diff options
author | rekby <rekby@ydb.tech> | 2023-03-23 11:08:52 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2023-03-23 11:08:52 +0300 |
commit | 224cb4a695bb3ddff3ceda4f75e6965e83c12e76 (patch) | |
tree | a770441fd74f7b6c3855c57ce92345b9864e21f2 | |
parent | 427223b94850669746be1c38fdad7cbc4323d1bf (diff) | |
download | ydb-224cb4a695bb3ddff3ceda4f75e6965e83c12e76.tar.gz |
update python ydb sdk to 3.0.1b13
8 files changed, 123 insertions, 18 deletions
diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py index d9cd87fc978..f95f7976e6f 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_asyncio.py @@ -61,11 +61,19 @@ class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool _reconnector: ReaderReconnector + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): + def __init__( + self, + driver: Driver, + settings: topic_reader.PublicReaderSettings, + *, + _parent=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = ReaderReconnector(driver, settings) + self._parent = _parent async def __aenter__(self): return self diff --git a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py index ca6fde92861..28dfd004376 100644 --- a/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/public/sdk/python3/ydb/_topic_reader/topic_reader_sync.py @@ -25,6 +25,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _parent: typing.Any # need for prevent stop the client by GC def __init__( self, @@ -32,6 +33,7 @@ class TopicReaderSync: settings: PublicReaderSettings, *, eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, # need for prevent stop the client by GC ): self._closed = False @@ -49,6 +51,8 @@ class TopicReaderSync: create_reader(), loop ).result() + self._parent = _parent + def __del__(self): self.close(flush=False) @@ -122,7 +126,7 @@ class TopicReaderSync: """ self._check_closed() - self._caller.call_sync(self._async_reader.commit(mess)) + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( self, diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py index b94ff46bec7..c24ee259ee2 100644 --- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py +++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer.py @@ -1,6 +1,7 @@ import concurrent.futures import datetime import enum +import itertools import uuid from dataclasses import dataclass from enum import Enum @@ -12,6 +13,7 @@ import ydb.aio from .._grpc.grpcwrapper.ydb_topic import StreamWriteMessage from .._grpc.grpcwrapper.common_utils import IToProto from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec +from .. import connection Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"] @@ -200,14 +202,94 @@ def default_serializer_message_content(data: Any) -> bytes: def messages_to_proto_requests( messages: List[InternalMessage], ) -> List[StreamWriteMessage.FromClient]: - # todo split by proto message size and codec - res = [] - for msg in messages: + + gropus = _slit_messages_for_send(messages) + + res = [] # type: List[StreamWriteMessage.FromClient] + for group in gropus: req = StreamWriteMessage.FromClient( StreamWriteMessage.WriteRequest( - messages=[msg.to_message_data()], - codec=msg.codec, + messages=list(map(InternalMessage.to_message_data, group)), + codec=group[0].codec, ) ) res.append(req) return res + + +_max_int = 2**63 - 1 + +_message_data_overhead = ( + StreamWriteMessage.FromClient( + StreamWriteMessage.WriteRequest( + messages=[ + StreamWriteMessage.WriteRequest.MessageData( + seq_no=_max_int, + created_at=datetime.datetime(3000, 1, 1, 1, 1, 1, 1), + data=bytes(1), + uncompressed_size=_max_int, + partitioning=StreamWriteMessage.PartitioningMessageGroupID( + message_group_id="a" * 100, + ), + ), + ], + codec=20000, + ) + ) + .to_proto() + .ByteSize() +) + + +def _slit_messages_for_send( + messages: List[InternalMessage], +) -> List[List[InternalMessage]]: + codec_groups = [] # type: List[List[InternalMessage]] + for _, messages in itertools.groupby(messages, lambda x: x.codec): + codec_groups.append(list(messages)) + + res = [] # type: List[List[InternalMessage]] + for codec_group in codec_groups: + group_by_size = _split_messages_by_size_with_default_overhead(codec_group) + res.extend(group_by_size) + return res + + +def _split_messages_by_size_with_default_overhead( + messages: List[InternalMessage], +) -> List[List[InternalMessage]]: + def get_message_size(msg: InternalMessage): + return len(msg.data) + _message_data_overhead + + return _split_messages_by_size( + messages, connection._DEFAULT_MAX_GRPC_MESSAGE_SIZE, get_message_size + ) + + +def _split_messages_by_size( + messages: List[InternalMessage], + split_size: int, + get_msg_size: typing.Callable[[InternalMessage], int], +) -> List[List[InternalMessage]]: + res = [] + group = [] + group_size = 0 + + for msg in messages: + msg_size = get_msg_size(msg) + + if len(group) == 0: + group.append(msg) + group_size += msg_size + elif group_size + msg_size <= split_size: + group.append(msg) + group_size += msg_size + else: + res.append(group) + group = [msg] + group_size = msg_size + + if len(group) > 0: + res.append(group) + + return res diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py index bc91312382f..481b797f65b 100644 --- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_asyncio.py @@ -47,13 +47,20 @@ class WriterAsyncIO: _loop: asyncio.AbstractEventLoop _reconnector: "WriterAsyncIOReconnector" _closed: bool + _parent: typing.Any # need for prevent close parent client by GC - def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings): + def __init__( + self, + driver: SupportedDriverType, + settings: PublicWriterSettings, + _client=None, + ): self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = WriterAsyncIOReconnector( driver=driver, settings=WriterSettings(settings) ) + self._parent = _client async def __aenter__(self) -> "WriterAsyncIO": return self diff --git a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py index de0ec41d097..7eed3a47774 100644 --- a/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/public/sdk/python3/ydb/_topic_writer/topic_writer_sync.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import typing from concurrent.futures import Future from typing import Union, List, Optional @@ -25,6 +26,7 @@ class WriterSync: _caller: CallFromSyncToAsync _async_writer: WriterAsyncIO _closed: bool + _parent: typing.Any # need for prevent close parent client by GC def __init__( self, @@ -32,6 +34,7 @@ class WriterSync: settings: PublicWriterSettings, *, eventloop: Optional[asyncio.AbstractEventLoop] = None, + _parent=None, ): self._closed = False @@ -49,6 +52,7 @@ class WriterSync: self._async_writer = self._caller.safe_call_with_result( create_async_writer(), None ) + self._parent = _parent def __enter__(self): return self diff --git a/ydb/public/sdk/python3/ydb/connection.py b/ydb/public/sdk/python3/ydb/connection.py index 95db084a3cb..25a54bb7779 100644 --- a/ydb/public/sdk/python3/ydb/connection.py +++ b/ydb/public/sdk/python3/ydb/connection.py @@ -24,6 +24,8 @@ YDB_DATABASE_HEADER = "x-ydb-database" YDB_TRACE_ID_HEADER = "x-ydb-trace-id" YDB_REQUEST_TYPE_HEADER = "x-ydb-request-type" +_DEFAULT_MAX_GRPC_MESSAGE_SIZE = 64 * 10**6 + def _message_to_string(message): """ @@ -179,10 +181,9 @@ def _construct_channel_options(driver_config, endpoint_options=None): :param endpoint_options: Endpoint options :return: A channel initialization options """ - _max_message_size = 64 * 10**6 _default_connect_options = [ - ("grpc.max_receive_message_length", _max_message_size), - ("grpc.max_send_message_length", _max_message_size), + ("grpc.max_receive_message_length", _DEFAULT_MAX_GRPC_MESSAGE_SIZE), + ("grpc.max_send_message_length", _DEFAULT_MAX_GRPC_MESSAGE_SIZE), ("grpc.primary_user_agent", driver_config.primary_user_agent), ( "grpc.lb_policy_name", diff --git a/ydb/public/sdk/python3/ydb/topic.py b/ydb/public/sdk/python3/ydb/topic.py index 7d983540f1c..c8c4ffef5a6 100644 --- a/ydb/public/sdk/python3/ydb/topic.py +++ b/ydb/public/sdk/python3/ydb/topic.py @@ -44,6 +44,7 @@ from ._topic_writer.topic_writer import ( # noqa: F401 RetryPolicy as TopicWriterRetryPolicy, ) +from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter from ._topic_common.common import ( @@ -51,8 +52,6 @@ from ._topic_common.common import ( create_result_wrapper as _create_result_wrapper, ) -from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO - from ._grpc.grpcwrapper import ydb_topic as _ydb_topic from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 @@ -174,7 +173,7 @@ class TopicClientAsyncIO: settings = TopicReaderSettings(**args) - return TopicReaderAsyncIO(self._driver, settings) + return TopicReaderAsyncIO(self._driver, settings, _parent=self) def writer( self, @@ -201,7 +200,7 @@ class TopicClientAsyncIO: if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriterAsyncIO(self._driver, settings) + return TopicWriterAsyncIO(self._driver, settings, _client=self) def close(self): if self._closed: @@ -331,7 +330,7 @@ class TopicClient: settings = TopicReaderSettings(**args) - return TopicReader(self._driver, settings) + return TopicReader(self._driver, settings, _parent=self) def writer( self, @@ -359,7 +358,7 @@ class TopicClient: if not settings.encoder_executor: settings.encoder_executor = self._executor - return TopicWriter(self._driver, settings) + return TopicWriter(self._driver, settings, _parent=self) def close(self): if self._closed: diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 4d649d59b48..30904512e7a 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.0.1b12" +VERSION = "3.0.1b13" |