diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-13 18:07:16 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-13 18:18:39 +0300 |
commit | d409ca87337a6863040a623686b8ed4c7f96cf10 (patch) | |
tree | 7917be21e44c6d4cc5c100da54e910b7f7271f75 | |
parent | ee1586dbe7089790d721f00e52026c958098e9cd (diff) | |
download | ydb-d409ca87337a6863040a623686b8ed4c7f96cf10.tar.gz |
Intermediate changes
commit_hash:ed866127668ea37af240d0f863a53784df961351
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py | 214 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py | 33 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py | 11 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py | 29 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/pool.py | 35 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/import_client.py | 5 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/pool.py | 35 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/scheme.py | 60 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/topic.py | 16 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 | ||||
-rw-r--r-- | yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp | 6 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/sensor_service.cpp | 7 |
15 files changed, 444 insertions, 15 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 181a3da9c5..aa17898a66 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.18.15 +Version: 3.18.16 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 1da9bb152c..8b10140b34 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.18.15) +VERSION(3.18.16) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 972003989c..d1872f4245 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -418,12 +418,14 @@ class StreamReadMessage: class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] consumer: str + auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) + res.auto_partitioning_support = self.auto_partitioning_support return res @dataclass @@ -696,6 +698,20 @@ class StreamReadMessage: ) @dataclass + class EndPartitionSession(IFromProto): + partition_session_id: int + adjacent_partition_ids: List[int] + child_partition_ids: List[int] + + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession): + return StreamReadMessage.EndPartitionSession( + partition_session_id=msg.partition_session_id, + adjacent_partition_ids=list(msg.adjacent_partition_ids), + child_partition_ids=list(msg.child_partition_ids), + ) + + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -774,6 +790,13 @@ class StreamReadMessage: msg.partition_session_status_response ), ) + elif mess_type == "end_partition_session": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.EndPartitionSession.from_proto( + msg.end_partition_session, + ), + ) else: raise issues.UnexpectedGrpcMessage( "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type @@ -798,6 +821,7 @@ ReaderMessagesFromServerToClient = Union[ UpdateTokenResponse, StreamReadMessage.StartPartitionSessionRequest, StreamReadMessage.StopPartitionSessionRequest, + StreamReadMessage.EndPartitionSession, ] @@ -942,18 +966,130 @@ class AlterConsumer(IToProto, IFromPublic): class PartitioningSettings(IToProto, IFromProto): min_active_partitions: int partition_count_limit: int + max_active_partitions: int + auto_partitioning_settings: AutoPartitioningSettings @staticmethod def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings": return PartitioningSettings( min_active_partitions=msg.min_active_partitions, partition_count_limit=msg.partition_count_limit, + max_active_partitions=msg.max_active_partitions, + auto_partitioning_settings=AutoPartitioningSettings.from_proto(msg.auto_partitioning_settings), ) def to_proto(self) -> ydb_topic_pb2.PartitioningSettings: + auto_partitioning_settings = None + if self.auto_partitioning_settings is not None: + auto_partitioning_settings = self.auto_partitioning_settings.to_proto() + return ydb_topic_pb2.PartitioningSettings( min_active_partitions=self.min_active_partitions, partition_count_limit=self.partition_count_limit, + max_active_partitions=self.max_active_partitions, + auto_partitioning_settings=auto_partitioning_settings, + ) + + +class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + @staticmethod + def from_public( + strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy], + ) -> Optional["AutoPartitioningStrategy"]: + if strategy is None: + return None + + return AutoPartitioningStrategy(strategy) + + @staticmethod + def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]: + if code is None: + return None + + return AutoPartitioningStrategy(code) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy: + try: + return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self)) + except KeyError: + return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED + + +@dataclass +class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic): + strategy: AutoPartitioningStrategy + partition_write_speed: AutoPartitioningWriteSpeedStrategy + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings], + ) -> Optional[AutoPartitioningSettings]: + if not settings: + return None + + return AutoPartitioningSettings( + strategy=settings.strategy, + partition_write_speed=AutoPartitioningWriteSpeedStrategy( + stabilization_window=settings.stabilization_window, + up_utilization_percent=settings.up_utilization_percent, + down_utilization_percent=settings.down_utilization_percent, + ), + ) + + @staticmethod + def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings: + if msg is None: + return None + + return AutoPartitioningSettings( + strategy=AutoPartitioningStrategy.from_proto(msg.strategy), + partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed), + ) + + def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings: + return ydb_topic_pb2.AutoPartitioningSettings( + strategy=self.strategy, partition_write_speed=self.partition_write_speed.to_proto() + ) + + def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings: + return ydb_topic_public_types.PublicAutoPartitioningSettings( + strategy=self.strategy.to_public(), + stabilization_window=self.partition_write_speed.stabilization_window, + up_utilization_percent=self.partition_write_speed.up_utilization_percent, + down_utilization_percent=self.partition_write_speed.down_utilization_percent, + ) + + +@dataclass +class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto): + stabilization_window: Optional[datetime.timedelta] + up_utilization_percent: Optional[int] + down_utilization_percent: Optional[int] + + def to_proto(self): + return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy( + stabilization_window=proto_duration_from_timedelta(self.stabilization_window), + up_utilization_percent=self.up_utilization_percent, + down_utilization_percent=self.down_utilization_percent, + ) + + @staticmethod + def from_proto( + msg: Optional[ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy], + ) -> Optional[AutoPartitioningWriteSpeedStrategy]: + if msg is None: + return None + + return AutoPartitioningWriteSpeedStrategy( + stabilization_window=timedelta_from_proto_duration(msg.stabilization_window), + up_utilization_percent=msg.up_utilization_percent, + down_utilization_percent=msg.down_utilization_percent, ) @@ -961,11 +1097,65 @@ class PartitioningSettings(IToProto, IFromProto): class AlterPartitioningSettings(IToProto): set_min_active_partitions: Optional[int] set_partition_count_limit: Optional[int] + set_max_active_partitions: Optional[int] + alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings] def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings: + alter_auto_partitioning_settings = None + if self.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = self.alter_auto_partitioning_settings.to_proto() + return ydb_topic_pb2.AlterPartitioningSettings( set_min_active_partitions=self.set_min_active_partitions, set_partition_count_limit=self.set_partition_count_limit, + set_max_active_partitions=self.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, + ) + + +@dataclass +class AlterAutoPartitioningSettings(IToProto, IFromPublic): + set_strategy: Optional[AutoPartitioningStrategy] + set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy] + + @staticmethod + def from_public( + settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings], + ) -> Optional[AlterAutoPartitioningSettings]: + if not settings: + return None + + return AlterAutoPartitioningSettings( + set_strategy=settings.set_strategy, + set_partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=settings.set_stabilization_window, + set_up_utilization_percent=settings.set_up_utilization_percent, + set_down_utilization_percent=settings.set_down_utilization_percent, + ), + ) + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings: + set_partition_write_speed = None + if self.set_partition_write_speed: + set_partition_write_speed = self.set_partition_write_speed.to_proto() + + return ydb_topic_pb2.AlterAutoPartitioningSettings( + set_strategy=self.set_strategy, + set_partition_write_speed=set_partition_write_speed, + ) + + +@dataclass +class AlterAutoPartitioningWriteSpeedStrategy(IToProto): + set_stabilization_window: Optional[datetime.timedelta] + set_up_utilization_percent: Optional[int] + set_down_utilization_percent: Optional[int] + + def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy: + return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy( + set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window), + set_up_utilization_percent=self.set_up_utilization_percent, + set_down_utilization_percent=self.set_down_utilization_percent, ) @@ -992,7 +1182,7 @@ class MeteringMode(int, IFromProto, IFromPublic, IToPublic): def to_public(self) -> ydb_topic_public_types.PublicMeteringMode: try: - ydb_topic_public_types.PublicMeteringMode(int(self)) + return ydb_topic_public_types.PublicMeteringMode(int(self)) except KeyError: return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED @@ -1011,9 +1201,13 @@ class CreateTopicRequest(IToProto, IFromPublic): metering_mode: "MeteringMode" def to_proto(self) -> ydb_topic_pb2.CreateTopicRequest: + partitioning_settings = None + if self.partitioning_settings is not None: + partitioning_settings = self.partitioning_settings.to_proto() + return ydb_topic_pb2.CreateTopicRequest( path=self.path, - partitioning_settings=self.partitioning_settings.to_proto(), + partitioning_settings=partitioning_settings, retention_period=proto_duration_from_timedelta(self.retention_period), retention_storage_mb=self.retention_storage_mb, supported_codecs=self.supported_codecs.to_proto(), @@ -1038,11 +1232,17 @@ class CreateTopicRequest(IToProto, IFromPublic): consumer = ydb_topic_public_types.PublicConsumer(name=consumer) consumers.append(Consumer.from_public(consumer)) + auto_partitioning_settings = None + if req.auto_partitioning_settings is not None: + auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings) + return CreateTopicRequest( path=req.path, partitioning_settings=PartitioningSettings( min_active_partitions=req.min_active_partitions, partition_count_limit=req.partition_count_limit, + max_active_partitions=req.max_active_partitions, + auto_partitioning_settings=auto_partitioning_settings, ), retention_period=req.retention_period, retention_storage_mb=req.retention_storage_mb, @@ -1113,6 +1313,12 @@ class AlterTopicRequest(IToProto, IFromPublic): consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer) alter_consumers.append(AlterConsumer.from_public(consumer)) + alter_auto_partitioning_settings = None + if req.alter_auto_partitioning_settings is not None: + alter_auto_partitioning_settings = AlterAutoPartitioningSettings.from_public( + req.alter_auto_partitioning_settings + ) + drop_consumers = req.drop_consumers if req.drop_consumers else [] return AlterTopicRequest( @@ -1120,6 +1326,8 @@ class AlterTopicRequest(IToProto, IFromPublic): alter_partitioning_settings=AlterPartitioningSettings( set_min_active_partitions=req.set_min_active_partitions, set_partition_count_limit=req.set_partition_count_limit, + set_max_active_partitions=req.set_max_active_partitions, + alter_auto_partitioning_settings=alter_auto_partitioning_settings, ), add_consumers=add_consumers, set_retention_period=req.set_retention_period, @@ -1180,6 +1388,8 @@ class DescribeTopicResult(IFromProtoWithProtoType, IToPublic): return ydb_topic_public_types.PublicDescribeTopicResult( self=scheme._wrap_scheme_entry(self.self_proto), min_active_partitions=self.partitioning_settings.min_active_partitions, + max_active_partitions=self.partitioning_settings.max_active_partitions, + auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(), partition_count_limit=self.partitioning_settings.partition_count_limit, partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)), retention_period=self.retention_period, diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 917dd53363..e3b118e9ca 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -18,6 +18,7 @@ from ...scheme import SchemeEntry class CreateTopicRequestParams: path: str min_active_partitions: Optional[int] + max_active_partitions: Optional[int] partition_count_limit: Optional[int] retention_period: Optional[datetime.timedelta] retention_storage_mb: Optional[int] @@ -27,12 +28,14 @@ class CreateTopicRequestParams: attributes: Optional[Dict[str, str]] consumers: Optional[List[Union["PublicConsumer", str]]] metering_mode: Optional["PublicMeteringMode"] + auto_partitioning_settings: Optional["PublicAutoPartitioningSettings"] @dataclass class AlterTopicRequestParams: path: str set_min_active_partitions: Optional[int] + set_max_active_partitions: Optional[int] set_partition_count_limit: Optional[int] add_consumers: Optional[List[Union["PublicConsumer", str]]] alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]] @@ -44,6 +47,7 @@ class AlterTopicRequestParams: set_retention_period: Optional[datetime.timedelta] set_retention_storage_mb: Optional[int] set_supported_codecs: Optional[List[Union["PublicCodec", int]]] + alter_auto_partitioning_settings: Optional["PublicAlterAutoPartitioningSettings"] class PublicCodec(int): @@ -67,6 +71,30 @@ class PublicMeteringMode(IntEnum): REQUEST_UNITS = 2 +class PublicAutoPartitioningStrategy(IntEnum): + UNSPECIFIED = 0 + DISABLED = 1 + SCALE_UP = 2 + SCALE_UP_AND_DOWN = 3 + PAUSED = 4 + + +@dataclass +class PublicAutoPartitioningSettings: + strategy: Optional["PublicAutoPartitioningStrategy"] = None + stabilization_window: Optional[datetime.timedelta] = None + down_utilization_percent: Optional[int] = None + up_utilization_percent: Optional[int] = None + + +@dataclass +class PublicAlterAutoPartitioningSettings: + set_strategy: Optional["PublicAutoPartitioningStrategy"] = None + set_stabilization_window: Optional[datetime.timedelta] = None + set_down_utilization_percent: Optional[int] = None + set_up_utilization_percent: Optional[int] = None + + @dataclass class PublicConsumer: name: str @@ -137,6 +165,9 @@ class PublicDescribeTopicResult: min_active_partitions: int "Minimum partition count auto merge would stop working at" + max_active_partitions: int + "Minimum partition count auto split would stop working at" + partition_count_limit: int "Limit for total partition count, including active (open for write) and read-only partitions" @@ -170,6 +201,8 @@ class PublicDescribeTopicResult: topic_stats: "PublicDescribeTopicResult.TopicStats" "Statistics of topic" + auto_partitioning_settings: "PublicAutoPartitioningSettings" + @dataclass class PartitionInfo: partition_id: int diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index a9c811ac4f..b48501aff2 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -121,6 +121,16 @@ class PartitionSession: def closed(self): return self.state == PartitionSession.State.Stopped + def end(self): + if self.closed: + return + + self.state = PartitionSession.State.Ended + + @property + def ended(self): + return self.state == PartitionSession.State.Ended + def _ensure_not_closed(self): if self.state == PartitionSession.State.Stopped: raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError() @@ -129,6 +139,7 @@ class PartitionSession: Active = 1 GracefulShutdown = 2 Stopped = 3 + Ended = 4 @dataclass(order=True) class CommitAckWaiter: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index b907ee2794..8bc12cc0d8 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -45,6 +45,7 @@ class PublicReaderSettings: consumer: str topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 + auto_partitioning_support: bool = True decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None """decoders: map[codec_code] func(encoded_bytes)->decoded_bytes""" @@ -77,6 +78,7 @@ class PublicReaderSettings: return StreamReadMessage.InitRequest( topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore consumer=self.consumer, + auto_partitioning_support=self.auto_partitioning_support, ) def _retry_settings(self) -> RetrySettings: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index e407fe01da..7061b4e449 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -388,6 +388,14 @@ class ReaderStream: partition_session_id, batch = self._message_batches.popitem(last=False) return partition_session_id, batch + def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch): + self._message_batches[part_sess_id] = batch + + # In case of auto-split we should return all parent messages ASAP + # without queue rotation to prevent child's messages before parent's. + if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended: + self._message_batches.move_to_end(part_sess_id, last=False) + def receive_batch_nowait(self, max_messages: Optional[int] = None): if self._get_first_error(): raise self._get_first_error() @@ -403,7 +411,8 @@ class ReaderStream: cutted_batch = batch._pop_batch(message_count=max_messages) - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) + self._buffer_release_bytes(cutted_batch._bytes_size) return cutted_batch @@ -423,7 +432,7 @@ class ReaderStream: self._buffer_release_bytes(batch._bytes_size) else: # TODO: we should somehow release bytes from single message as well - self._message_batches[part_sess_id] = batch + self._return_batch_to_queue(part_sess_id, batch) return message @@ -498,6 +507,12 @@ class ReaderStream: ): self._on_partition_session_stop(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.EndPartitionSession, + ): + self._on_end_partition_session(message.server_message) + elif isinstance(message.server_message, UpdateTokenResponse): self._update_token_event.set() @@ -575,6 +590,16 @@ class ReaderStream: ) ) + def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): + logger.debug( + f"End partition session with id: {message.partition_session_id}, " + f"child partitions: {message.child_partition_ids}" + ) + + if message.partition_session_id in self._partition_sessions: + # Mark partition session as ended not to shuffle messages. + self._partition_sessions[message.partition_session_id].end() + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size) diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index 456896dbb5..f6a84eb0b1 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -13,9 +13,11 @@ from ...retries import ( RetrySettings, retry_operation_async, ) +from ...query.base import BaseQueryTxMode from ...query.base import QueryClientSettings from ... import convert from ..._grpc.grpcwrapper import common_utils +from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public logger = logging.getLogger(__name__) @@ -122,6 +124,39 @@ class QuerySessionPool: return await retry_operation_async(wrapped_callee, retry_settings) + async def retry_tx_async( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + async def wrapped_callee(): + async with self.checkout() as session: + async with session.transaction(tx_mode=tx_mode) as tx: + result = await callee(tx, *args, **kwargs) + await tx.commit() + return result + + return await retry_operation_async(wrapped_callee, retry_settings) + async def execute_with_retries( self, query: str, diff --git a/contrib/python/ydb/py3/ydb/import_client.py b/contrib/python/ydb/py3/ydb/import_client.py index 830f10c5bb..9a01e5a508 100644 --- a/contrib/python/ydb/py3/ydb/import_client.py +++ b/contrib/python/ydb/py3/ydb/import_client.py @@ -32,7 +32,10 @@ class ImportProgress(enum.IntEnum): def _initialize_progresses(): for key, value in ydb_import_pb2.ImportProgress.Progress.items(): - _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :]) + try: + _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :]) + except AttributeError: + pass _initialize_progresses() diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index f1fcd17360..e3775c4dd1 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -8,6 +8,7 @@ import time import threading import queue +from .base import BaseQueryTxMode from .base import QueryClientSettings from .session import ( QuerySession, @@ -20,6 +21,7 @@ from .. import issues from .. import convert from ..settings import BaseRequestSettings from .._grpc.grpcwrapper import common_utils +from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public logger = logging.getLogger(__name__) @@ -138,6 +140,39 @@ class QuerySessionPool: return retry_operation_sync(wrapped_callee, retry_settings) + def retry_tx_sync( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + def wrapped_callee(): + with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: + with session.transaction(tx_mode=tx_mode) as tx: + result = callee(tx, *args, **kwargs) + tx.commit() + return result + + return retry_operation_sync(wrapped_callee, retry_settings) + def execute_with_retries( self, query: str, diff --git a/contrib/python/ydb/py3/ydb/scheme.py b/contrib/python/ydb/py3/ydb/scheme.py index 04951b5eae..263d1c65d3 100644 --- a/contrib/python/ydb/py3/ydb/scheme.py +++ b/contrib/python/ydb/py3/ydb/scheme.py @@ -24,6 +24,10 @@ class SchemeEntryType(enum.IntEnum): SEQUENCE = 15 REPLICATION = 16 TOPIC = 17 + EXTERNAL_TABLE = 18 + EXTERNAL_DATA_SOURCE = 19 + VIEW = 20 + RESOURCE_POOL = 21 @classmethod def _missing_(cls, value): @@ -103,6 +107,38 @@ class SchemeEntryType(enum.IntEnum): """ return entry == SchemeEntryType.DATABASE or entry == SchemeEntryType.DIRECTORY + @staticmethod + def is_external_table(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is an external table and False otherwise + """ + return entry == SchemeEntryType.EXTERNAL_TABLE + + @staticmethod + def is_external_data_source(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is an external data source and False otherwise + """ + return entry == SchemeEntryType.EXTERNAL_DATA_SOURCE + + @staticmethod + def is_external_view(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is a view and False otherwise + """ + return entry == SchemeEntryType.VIEW + + @staticmethod + def is_external_resource_pool(entry): + """ + :param entry: A scheme entry to check + :return: True if scheme entry is a resource pool and False otherwise + """ + return entry == SchemeEntryType.RESOURCE_POOL + class SchemeEntry(object): __slots__ = ( @@ -185,6 +221,30 @@ class SchemeEntry(object): """ return SchemeEntryType.is_coordination_node(self.type) + def is_external_table(self): + """ + :return: True if scheme entry is an external table and False otherwise + """ + return SchemeEntryType.is_external_table(self.type) + + def is_external_data_source(self): + """ + :return: True if scheme entry is an external data source and False otherwise + """ + return SchemeEntryType.is_external_data_source(self.type) + + def is_view(self): + """ + :return: True if scheme entry is a view and False otherwise + """ + return SchemeEntryType.is_view(self.type) + + def is_resource_pool(self): + """ + :return: True if scheme entry is a resource pool and False otherwise + """ + return SchemeEntryType.is_resource_pool(self.type) + class Directory(SchemeEntry): __slots__ = ("children",) diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index f0b872e297..55f4ea04c5 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -7,6 +7,9 @@ __all__ = [ "TopicCodec", "TopicConsumer", "TopicAlterConsumer", + "TopicAlterAutoPartitioningSettings", + "TopicAutoPartitioningSettings", + "TopicAutoPartitioningStrategy", "TopicDescription", "TopicError", "TopicMeteringMode", @@ -80,6 +83,9 @@ from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401 PublicConsumer as TopicConsumer, PublicAlterConsumer as TopicAlterConsumer, PublicMeteringMode as TopicMeteringMode, + PublicAutoPartitioningStrategy as TopicAutoPartitioningStrategy, + PublicAutoPartitioningSettings as TopicAutoPartitioningSettings, + PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings, ) @@ -108,6 +114,7 @@ class TopicClientAsyncIO: self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, @@ -117,6 +124,7 @@ class TopicClientAsyncIO: attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -151,6 +159,7 @@ class TopicClientAsyncIO: self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -162,6 +171,7 @@ class TopicClientAsyncIO: set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -226,6 +236,7 @@ class TopicClientAsyncIO: # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -305,6 +316,7 @@ class TopicClient: self, path: str, min_active_partitions: Optional[int] = None, + max_active_partitions: Optional[int] = None, partition_count_limit: Optional[int] = None, retention_period: Optional[datetime.timedelta] = None, retention_storage_mb: Optional[int] = None, @@ -314,6 +326,7 @@ class TopicClient: attributes: Optional[Dict[str, str]] = None, consumers: Optional[List[Union[TopicConsumer, str]]] = None, metering_mode: Optional[TopicMeteringMode] = None, + auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None, ): """ create topic command @@ -350,6 +363,7 @@ class TopicClient: self, path: str, set_min_active_partitions: Optional[int] = None, + set_max_active_partitions: Optional[int] = None, set_partition_count_limit: Optional[int] = None, add_consumers: Optional[List[Union[TopicConsumer, str]]] = None, alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None, @@ -361,6 +375,7 @@ class TopicClient: set_retention_period: Optional[datetime.timedelta] = None, set_retention_storage_mb: Optional[int] = None, set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None, + alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None, ): """ alter topic command @@ -431,6 +446,7 @@ class TopicClient: # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index bdc80c2111..750aee1df8 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.18.15" +VERSION = "3.18.16" diff --git a/yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp b/yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp index 339da19ddc..6448f6f889 100644 --- a/yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp +++ b/yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp @@ -166,12 +166,6 @@ public: } } - void Close() override final { - with_lock(Mutex_) { - DataFile_.Clear(); - } - } - private: const TQWriterSettings Settings_; const bool AlwaysFlushIndex_; diff --git a/yt/yt/library/profiling/solomon/sensor_service.cpp b/yt/yt/library/profiling/solomon/sensor_service.cpp index 291581f182..d6f814f2ea 100644 --- a/yt/yt/library/profiling/solomon/sensor_service.cpp +++ b/yt/yt/library/profiling/solomon/sensor_service.cpp @@ -137,6 +137,9 @@ public: , RootSensorServiceImpl_(New<TSensorServiceImpl>(/*name*/ std::string(), Registry_.Get(), &Exporter_->Lock_)) , Root_(GetEphemeralNodeFactory(/*shouldHideAttributes*/ true)->CreateMap()) , SensorTreeUpdateDuration_(Registry_->GetSelfProfiler().Timer("/sensor_service_tree_update_duration")) + { } + + void Initialize() { UpdateSensorTreeExecutor_ = New<TPeriodicExecutor>( Exporter_->ControlQueue_->GetInvoker(), @@ -272,10 +275,12 @@ IYPathServicePtr CreateSensorService( TSolomonRegistryPtr registry, TSolomonExporterPtr exporter) { - return New<TSensorService>( + auto service = New<TSensorService>( std::move(config), std::move(registry), std::move(exporter)); + service->Initialize(); + return service; } //////////////////////////////////////////////////////////////////////////////// |