diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-02-14 00:51:34 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-02-14 00:51:34 +0000 |
commit | 7c59c24919f9e86614d1cd19c62829e01dd54097 (patch) | |
tree | d42e64eb38c0388a0f4f3ad148bad8938324e279 /contrib | |
parent | 28180f60aec6dcb2b662b6417c90226553ebe2dc (diff) | |
parent | c26bb8abd161c590e8cb0e7280a14c335c1eb893 (diff) | |
download | ydb-7c59c24919f9e86614d1cd19c62829e01dd54097.tar.gz |
Merge branch 'rightlib' into merge-libs-250214-0050
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h | 20 | ||||
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py | 214 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py | 33 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py | 11 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py | 29 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/pool.py | 35 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/import_client.py | 5 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/pool.py | 35 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/scheme.py | 60 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/topic.py | 16 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
14 files changed, 453 insertions, 13 deletions
diff --git a/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h b/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h index 1fbd7d3a2c..694406acc9 100644 --- a/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h +++ b/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h @@ -28,6 +28,16 @@ #endif +#ifndef POCO_CRYPT_NO_SANITIZE_THREAD + #define POCO_CRYPT_NO_SANITIZE_THREAD + #if defined(__has_feature) + #if __has_feature(thread_sanitizer) + #undef POCO_CRYPT_NO_SANITIZE_THREAD + #define POCO_CRYPT_NO_SANITIZE_THREAD __attribute__((no_sanitize_thread)) + #endif + #endif +#endif + extern "C" { struct CRYPTO_dynlock_value @@ -50,14 +60,14 @@ class Crypto_API OpenSSLInitializer public: OpenSSLInitializer(); /// Automatically initialize OpenSSL on startup. - + ~OpenSSLInitializer(); /// Automatically shut down OpenSSL on exit. - - static void initialize(); + + POCO_CRYPT_NO_SANITIZE_THREAD static void initialize(); /// Initializes the OpenSSL machinery. - static void uninitialize(); + POCO_CRYPT_NO_SANITIZE_THREAD static void uninitialize(); /// Shuts down the OpenSSL machinery. static bool isFIPSEnabled(); @@ -71,7 +81,7 @@ protected: { SEEDSIZE = 256 }; - + // OpenSSL multithreading support static void lock(int mode, int n, const char* file, int line); static unsigned long id(); diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 181a3da9c5..aa17898a66 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.18.15 +Version: 3.18.16 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 1da9bb152c..8b10140b34 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.18.15) +VERSION(3.18.16) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 972003989c..d1872f4245 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -418,12 +418,14 @@ class StreamReadMessage: class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] consumer: str + auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) + res.auto_partitioning_support = self.auto_partitioning_support return res @dataclass @@ -696,6 +698,20 @@ class StreamReadMessage: ) @dataclass + class EndPartitionSession(IFromProto): + partition_session_id: int + adjacent_partition_ids: List[int] + child_partition_ids: List[int] + + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession): + return StreamReadMessage.EndPartitionSession( + partition_session_id=msg.partition_session_id, + adjacent_partition_ids=list(msg.adjacent_partition_ids), + child_partition_ids=list(msg.child_partition_ids), + ) + + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -774,6 +790,13 @@ class StreamReadMessage: msg.partition_session_status_response ), ) + elif mess_type == "end_partition_session": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.EndPartitionSession.from_proto( + msg.end_partition_session, + ), + ) else: raise issues.UnexpectedGrpcMessage( "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type @@ -798,6 +821,7 @@ ReaderMessagesFromServerToClient = Union[ UpdateTokenResponse, StreamReadMessage.StartPartitionSessionRequest, StreamReadMessage.StopPartitionSessionRequest, + StreamReadMessage.EndPartitionSession, ] @@ -942,18 +966,130 @@ class AlterConsumer(IToProto, IFromPublic): class PartitioningSettings(IToProto, IFromProto): min_active_partitions: int partition_count_limit: int + max_active_partitions: int + auto_partitioning_settings: AutoPartitioningSettings @staticmethod def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings": return PartitioningSettings( min_active_partitions=msg.min_active_partitions, partition_count_limit=msg.partition_count_limit, + max_active_partitions=msg.max_active_partitions, + auto_partitioning_settings=AutoPartitioningSettings.from_proto(msg.auto_partitioning_settings), ) def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: + auto_partitioning_settings = None + if self.auto_partitioning_settings is not None: + auto_partitioning_settings = self.auto_partitioning_settings.to_proto() + return ydb_topic_pb2.PartitioningSettings( min_active_partitions=self.min_active_partitions, partition_count_limit=self.partition_count_limit, + max_active_partitions=self.max_active_partitions, + auto_partitioning_settings=auto_partitioning_settings, + ) + + +class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + @staticmethod + def from_public( + strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy], + ) -> Optional["AutoPartitioningStrategy"]: + if strategy is None: + return None + + return AutoPartitioningStrategy(strategy) + + @staticmethod + def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]: + if code is None: + return None + + return AutoPartitioningStrategy(code) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy: + try: + return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) + except KeyError: + return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED + + +@dataclass +class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic): + strategy: AutoPartitioningStrategy + partition_write_speed: AutoPartitioningWriteSpeedStrategy + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings], + ) -> Optional[AutoPartitioningSettings]: + if not settings: + return None + + return AutoPartitioningSettings( + strategy=settings.strategy, + partition_write_speed=AutoPartitioningWriteSpeedStrategy( + stabilization_window=settings.stabilization_window, + up_utilization_percent=settings.up_utilization_percent, + down_utilization_percent=settings.down_utilization_percent, + ), + ) + + @staticmethod + def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings: + if msg is None: + return None + + return AutoPartitioningSettings( + strategy=AutoPartitioningStrategy.from_proto(msg.strategy), + partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed), + ) + + def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings: + return ydb_topic_pb2.AutoPartitioningSettings( + strategy=self.strategy, partition_write_speed=self.partition_write_speed.to_proto() + ) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings: + return ydb_topic_public_types.PublicAutoPartitioningSettings( + strategy=self.strategy.to_public(), + stabilization_window=self.partition_write_speed.stabilization_window, + up_utilization_percent=self.partition_write_speed.up_utilization_percent, + down_utilization_percent=self.partition_write_speed.down_utilization_percent, + ) + + +@dataclass +class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto): + stabilization_window: Optional[datetime.timedelta] + up_utilization_percent: Optional[int] + down_utilization_percent: Optional[int] + + def to_proto(self): + return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy( + stabilization_window=proto_duration_from_timedelta(self.stabilization_window), + up_utilization_percent=self.up_utilization_percent, + down_utilization_percent=self.down_utilization_percent, + ) + + @staticmethod + def from_proto( + msg: Optional[ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy], + ) -> Optional[AutoPartitioningWriteSpeedStrategy]: + if msg is None: + return None + + return AutoPartitioningWriteSpeedStrategy( + stabilization_window=timedelta_from_proto_duration(msg.stabilization_window), + up_utilization_percent=msg.up_utilization_percent, + down_utilization_percent=msg.down_utilization_percent, ) @@ -961,11 +1097,65 @@ class PartitioningSettings(IToProto, IFromProto): class AlterPartitioningSettings(IToProto): set_min_active_partitions: Optional[int] set_partition_count_limit: Optional[int] + set_max_active_partitions: Optional[int] + alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings] def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: + alter_auto_partitioning_settings = None + if self.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = self.alter_auto_partitioning_settings.to_proto() + return ydb_topic_pb2.AlterPartitioningSettings( set_min_active_partitions=self.set_min_active_partitions, set_partition_count_limit=self.set_partition_count_limit, + set_max_active_partitions=self.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, + ) + + +@dataclass +class AlterAutoPartitioningSettings(IToProto, IFromPublic): + set_strategy: Optional[AutoPartitioningStrategy] + set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy] + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings], + ) -> Optional[AlterAutoPartitioningSettings]: + if not settings: + return None + + return AlterAutoPartitioningSettings( + set_strategy=settings.set_strategy, + set_partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=settings.set_stabilization_window, + set_up_utilization_percent=settings.set_up_utilization_percent, + set_down_utilization_percent=settings.set_down_utilization_percent, + ), + ) + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings: + set_partition_write_speed = None + if self.set_partition_write_speed: + set_partition_write_speed = self.set_partition_write_speed.to_proto() + + return ydb_topic_pb2.AlterAutoPartitioningSettings( + set_strategy=self.set_strategy, + set_partition_write_speed=set_partition_write_speed, + ) + + +@dataclass +class AlterAutoPartitioningWriteSpeedStrategy(IToProto): + set_stabilization_window: Optional[datetime.timedelta] + set_up_utilization_percent: Optional[int] + set_down_utilization_percent: Optional[int] + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy: + return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window), + set_up_utilization_percent=self.set_up_utilization_percent, + set_down_utilization_percent=self.set_down_utilization_percent, ) @@ -992,7 +1182,7 @@ class MeteringMode(int, IFromProto, IFromPublic, IToPublic): def to_public(self) -> ydb_topic_public_types.PublicMeteringMode: try: - ydb_topic_public_types.PublicMeteringMode(int(self)) + return ydb_topic_public_types.PublicMeteringMode(int(self)) except KeyError: return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED @@ -1011,9 +1201,13 @@ class CreateTopicRequest(IToProto, IFromPublic): metering_mode: "MeteringMode" def to_proto(self) -> ydb_topic_pb2.CreateTopicRequest: + partitioning_settings = None + if self.partitioning_settings is not None: + partitioning_settings = self.partitioning_settings.to_proto() + return ydb_topic_pb2.CreateTopicRequest( path=self.path, - partitioning_settings=self.partitioning_settings.to_proto(), + partitioning_settings=partitioning_settings, retention_period=proto_duration_from_timedelta(self.retention_period), retention_storage_mb=self.retention_storage_mb, supported_codecs=self.supported_codecs.to_proto(), @@ -1038,11 +1232,17 @@ class CreateTopicRequest(IToProto, IFromPublic): consumer = ydb_topic_public_types.PublicConsumer(name=consumer) consumers.append(Consumer.from_public(consumer)) + auto_partitioning_settings = None + if req.auto_partitioning_settings is not None: + auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings) + return CreateTopicRequest( path=req.path, partitioning_settings=PartitioningSettings( min_active_partitions=req.min_active_partitions, partition_count_limit=req.partition_count_limit, + max_active_partitions=req.max_active_partitions, + auto_partitioning_settings=auto_partitioning_settings, ), retention_period=req.retention_period, retention_storage_mb=req.retention_storage_mb, @@ -1113,6 +1313,12 @@ class AlterTopicRequest(IToProto, IFromPublic): consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer) alter_consumers.append(AlterConsumer.from_public(consumer)) + alter_auto_partitioning_settings = None + if req.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = AlterAutoPartitioningSettings.from_public( + req.alter_auto_partitioning_settings + ) + drop_consumers = req.drop_consumers if req.drop_consumers else [] return AlterTopicRequest( @@ -1120,6 +1326,8 @@ class AlterTopicRequest(IToProto, IFromPublic): alter_partitioning_settings=AlterPartitioningSettings( set_min_active_partitions=req.set_min_active_partitions, set_partition_count_limit=req.set_partition_count_limit, + set_max_active_partitions=req.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, ), add_consumers=add_consumers, set_retention_period=req.set_retention_period, @@ -1180,6 +1388,8 @@ class DescribeTopicResult(IFromProtoWithProtoType, IToPublic): return ydb_topic_public_types.PublicDescribeTopicResult( self=scheme._wrap_scheme_entry(self.self_proto), min_active_partitions=self.partitioning_settings.min_active_partitions, + max_active_partitions=self.partitioning_settings.max_active_partitions, + auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(), partition_count_limit=self.partitioning_settings.partition_count_limit, partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)), retention_period=self.retention_period, diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 917dd53363..e3b118e9ca 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -18,6 +18,7 @@ from ...scheme import SchemeEntry class CreateTopicRequestParams: path: str min_active_partitions: Optional[int] + max_active_partitions: Optional[int] partition_count_limit: Optional[int] retention_period: Optional[datetime.timedelta] retention_storage_mb: Optional[int] @@ -27,12 +28,14 @@ class CreateTopicRequestParams: attributes: Optional[Dict[str, str]] consumers: Optional[List[Union["PublicConsumer", str]]] metering_mode: Optional["PublicMeteringMode"] + auto_partitioning_settings: Optional["PublicAutoPartitioningSettings"] @dataclass class AlterTopicRequestParams: path: str set_min_active_partitions: Optional[int] + set_max_active_partitions: Optional[int] set_partition_count_limit: Optional[int] add_consumers: Optional[List[Union["PublicConsumer", str]]] alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] @@ -44,6 +47,7 @@ class AlterTopicRequestParams: set_retention_period: Optional[datetime.timedelta] set_retention_storage_mb: Optional[int] set_supported_codecs: Optional[List[Union["PublicCodec", int]]] + alter_auto_partitioning_settings: Optional["PublicAlterAutoPartitioningSettings"] class PublicCodec(int): @@ -67,6 +71,30 @@ class PublicMeteringMode(IntEnum): REQUEST_UNITS = 2 +class PublicAutoPartitioningStrategy(IntEnum): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + +@dataclass +class PublicAutoPartitioningSettings: + strategy: Optional["PublicAutoPartitioningStrategy"] = None + stabilization_window: Optional[datetime.timedelta] = None + down_utilization_percent: Optional[int] = None + up_utilization_percent: Optional[int] = None + + +@dataclass +class PublicAlterAutoPartitioningSettings: + set_strategy: Optional["PublicAutoPartitioningStrategy"] = None + set_stabilization_window: Optional[datetime.timedelta] = None + set_down_utilization_percent: Optional[int] = None + set_up_utilization_percent: Optional[int] = None + + @dataclass class PublicConsumer: name: str @@ -137,6 +165,9 @@ class PublicDescribeTopicResult: min_active_partitions: int "Minimum partition count auto merge would stop working at" + max_active_partitions: int + "Minimum partition count auto split would stop working at" + partition_count_limit: int "Limit for total partition count, including active (open for write) and read-only partitions" @@ -170,6 +201,8 @@ class PublicDescribeTopicResult: topic_stats: "PublicDescribeTopicResult.TopicStats" "Statistics of topic" + auto_partitioning_settings: "PublicAutoPartitioningSettings" + @dataclass class PartitionInfo: partition_id: int diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index a9c811ac4f..b48501aff2 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -121,6 +121,16 @@ class PartitionSession: def closed(self): return self.state == PartitionSession.State.Stopped + def end(self): + if self.closed: + return + + self.state = PartitionSession.State.Ended + + @property + def ended(self): + return self.state == PartitionSession.State.Ended + def _ensure_not_closed(self): if self.state == PartitionSession.State.Stopped: raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() @@ -129,6 +139,7 @@ class PartitionSession: Active = 1 GracefulShutdown = 2 Stopped = 3 + Ended = 4 @dataclass(order=True) class CommitAckWaiter: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index b907ee2794..8bc12cc0d8 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -45,6 +45,7 @@ class PublicReaderSettings: consumer: str topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 + auto_partitioning_support: bool = True decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None """decoders: map[codec_code] func(encoded_bytes)->decoded_bytes""" @@ -77,6 +78,7 @@ class PublicReaderSettings: return StreamReadMessage.InitRequest( topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore consumer=self.consumer, + auto_partitioning_support=self.auto_partitioning_support, ) def _retry_settings(self) -> RetrySettings: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index e407fe01da..7061b4e449 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -388,6 +388,14 @@ class ReaderStream: partition_session_id, batch = self._message_batches.popitem(last=False) return partition_session_id, batch + def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch): + self._message_batches[part_sess_id] = batch + + # In case of auto-split we should return all parent messages ASAP + # without queue rotation to prevent child's messages before parent's. + if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended: + self._message_batches.move_to_end(part_sess_id, last=False) + def receive_batch_nowait(self, max_messages: Optional[int] = None): if self._get_first_error(): raise self._get_first_error() @@ -403,7 +411,8 @@ class ReaderStream: cutted_batch = batch._pop_batch(message_count=max_messages) - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) + self._buffer_release_bytes(cutted_batch._bytes_size) return cutted_batch @@ -423,7 +432,7 @@ class ReaderStream: self._buffer_release_bytes(batch._bytes_size) else: # TODO: we should somehow release bytes from single message as well - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) return message @@ -498,6 +507,12 @@ class ReaderStream: ): self._on_partition_session_stop(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.EndPartitionSession, + ): + self._on_end_partition_session(message.server_message) + elif isinstance(message.server_message, UpdateTokenResponse): self._update_token_event.set() @@ -575,6 +590,16 @@ class ReaderStream: ) ) + def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): + logger.debug( + f"End partition session with id: {message.partition_session_id}, " + f"child partitions: {message.child_partition_ids}" + ) + + if message.partition_session_id in self._partition_sessions: + # Mark partition session as ended not to shuffle messages. + self._partition_sessions[message.partition_session_id].end() + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index 456896dbb5..f6a84eb0b1 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -13,9 +13,11 @@ from ...retries import ( RetrySettings, retry_operation_async, ) +from ...query.base import BaseQueryTxMode from ...query.base import QueryClientSettings from ... import convert from ..._grpc.grpcwrapper import common_utils +from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public logger = logging.getLogger(__name__) @@ -122,6 +124,39 @@ class QuerySessionPool: return await retry_operation_async(wrapped_callee, retry_settings) + async def retry_tx_async( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + async def wrapped_callee(): + async with self.checkout() as session: + async with session.transaction(tx_mode=tx_mode) as tx: + result = await callee(tx, *args, **kwargs) + await tx.commit() + return result + + return await retry_operation_async(wrapped_callee, retry_settings) + async def execute_with_retries( self, query: str, diff --git a/contrib/python/ydb/py3/ydb/import_client.py b/contrib/python/ydb/py3/ydb/import_client.py index 830f10c5bb..9a01e5a508 100644 --- a/contrib/python/ydb/py3/ydb/import_client.py +++ b/contrib/python/ydb/py3/ydb/import_client.py @@ -32,7 +32,10 @@ class ImportProgress(enum.IntEnum): def _initialize_progresses(): for key, value in ydb_import_pb2.ImportProgress.Progress.items(): - _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :]) + try: + _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :]) + except AttributeError: + pass _initialize_progresses() diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index f1fcd17360..e3775c4dd1 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -8,6 +8,7 @@ import time import threading import queue +from .base import BaseQueryTxMode from .base import QueryClientSettings from .session import ( QuerySession, @@ -20,6 +21,7 @@ from .. import issues from .. import convert from ..settings import BaseRequestSettings from .._grpc.grpcwrapper import common_utils +from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public logger = logging.getLogger(__name__) @@ -138,6 +140,39 @@ class QuerySessionPool: return retry_operation_sync(wrapped_callee, retry_settings) + def retry_tx_sync( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + def wrapped_callee(): + with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: + with session.transaction(tx_mode=tx_mode) as tx: + result = callee(tx, *args, **kwargs) + tx.commit() + return result + + return retry_operation_sync(wrapped_callee, retry_settings) + def execute_with_retries( self, query: str, diff --git a/contrib/python/ydb/py3/ydb/scheme.py b/contrib/python/ydb/py3/ydb/scheme.py index 04951b5eae..263d1c65d3 100644 --- a/contrib/python/ydb/py3/ydb/scheme.py +++ b/contrib/python/ydb/py3/ydb/scheme.py @@ -24,6 +24,10 @@ class SchemeEntryType(enum.IntEnum): SEQUENCE = 15 REPLICATION = 16 TOPIC = 17 + EXTERNAL_TABLE = 18 + EXTERNAL_DATA_SOURCE = 19 + VIEW = 20 + RESOURCE_POOL = 21 @classmethod def _missing_(cls, value): @@ -103,6 +107,38 @@ class SchemeEntryType(enum.IntEnum): """ return entry == SchemeEntryType.DATABASE or entry == SchemeEntryType.DIRECTORY + @staticmethod + def is_external_table(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is an external table and False otherwise + """ + return entry == SchemeEntryType.EXTERNAL_TABLE + + @staticmethod + def is_external_data_source(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is an external data source and False otherwise + """ + return entry == SchemeEntryType.EXTERNAL_DATA_SOURCE + + @staticmethod + def is_external_view(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is a view and False otherwise + """ + return entry == SchemeEntryType.VIEW + + @staticmethod + def is_external_resource_pool(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is a resource pool and False otherwise + """ + return entry == SchemeEntryType.RESOURCE_POOL + class SchemeEntry(object): __slots__ = ( @@ -185,6 +221,30 @@ class SchemeEntry(object): """ return SchemeEntryType.is_coordination_node(self.type) + def is_external_table(self): + """ + :return: True if scheme entry is an external table and False otherwise + """ + return SchemeEntryType.is_external_table(self.type) + + def is_external_data_source(self): + """ + :return: True if scheme entry is an external data source and False otherwise + """ + return SchemeEntryType.is_external_data_source(self.type) + + def is_view(self): + """ + :return: True if scheme entry is a view and False otherwise + """ + return SchemeEntryType.is_view(self.type) + + def is_resource_pool(self): + """ + :return: True if scheme entry is a resource pool and False otherwise + """ + return SchemeEntryType.is_resource_pool(self.type) + class Directory(SchemeEntry): __slots__ = ("children",) diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index f0b872e297..55f4ea04c5 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -7,6 +7,9 @@ __all__ = [ "TopicCodec", "TopicConsumer", "TopicAlterConsumer", + "TopicAlterAutoPartitioningSettings", + "TopicAutoPartitioningSettings", + "TopicAutoPartitioningStrategy", "TopicDescription", "TopicError", "TopicMeteringMode", @@ -80,6 +83,9 @@ from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 PublicConsumer as TopicConsumer, PublicAlterConsumer as TopicAlterConsumer, PublicMeteringMode as TopicMeteringMode, + PublicAutoPartitioningStrategy as TopicAutoPartitioningStrategy, + PublicAutoPartitioningSettings as TopicAutoPartitioningSettings, + PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings, ) @@ -108,6 +114,7 @@ class TopicClientAsyncIO: self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, @@ -117,6 +124,7 @@ class TopicClientAsyncIO: attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -151,6 +159,7 @@ class TopicClientAsyncIO: self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -162,6 +171,7 @@ class TopicClientAsyncIO: set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -226,6 +236,7 @@ class TopicClientAsyncIO: # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -305,6 +316,7 @@ class TopicClient: self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, @@ -314,6 +326,7 @@ class TopicClient: attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -350,6 +363,7 @@ class TopicClient: self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -361,6 +375,7 @@ class TopicClient: set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -431,6 +446,7 @@ class TopicClient: # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index bdc80c2111..750aee1df8 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.18.15" +VERSION = "3.18.16" |