aboutsummaryrefslogtreecommitdiffstats
path: root/contrib
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-02-14 00:51:34 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-02-14 00:51:34 +0000
commit7c59c24919f9e86614d1cd19c62829e01dd54097 (patch)
treed42e64eb38c0388a0f4f3ad148bad8938324e279 /contrib
parent28180f60aec6dcb2b662b6417c90226553ebe2dc (diff)
parentc26bb8abd161c590e8cb0e7280a14c335c1eb893 (diff)
downloadydb-7c59c24919f9e86614d1cd19c62829e01dd54097.tar.gz
Merge branch 'rightlib' into merge-libs-250214-0050
Diffstat (limited to 'contrib')
-rw-r--r--contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h20
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py214
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py33
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py11
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py29
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/pool.py35
-rw-r--r--contrib/python/ydb/py3/ydb/import_client.py5
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py35
-rw-r--r--contrib/python/ydb/py3/ydb/scheme.py60
-rw-r--r--contrib/python/ydb/py3/ydb/topic.py16
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
14 files changed, 453 insertions, 13 deletions
diff --git a/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h b/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h
index 1fbd7d3a2c..694406acc9 100644
--- a/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h
+++ b/contrib/libs/poco/Crypto/include/Poco/Crypto/OpenSSLInitializer.h
@@ -28,6 +28,16 @@
#endif
+#ifndef POCO_CRYPT_NO_SANITIZE_THREAD
+ #define POCO_CRYPT_NO_SANITIZE_THREAD
+ #if defined(__has_feature)
+ #if __has_feature(thread_sanitizer)
+ #undef POCO_CRYPT_NO_SANITIZE_THREAD
+ #define POCO_CRYPT_NO_SANITIZE_THREAD __attribute__((no_sanitize_thread))
+ #endif
+ #endif
+#endif
+
extern "C"
{
struct CRYPTO_dynlock_value
@@ -50,14 +60,14 @@ class Crypto_API OpenSSLInitializer
public:
OpenSSLInitializer();
/// Automatically initialize OpenSSL on startup.
-
+
~OpenSSLInitializer();
/// Automatically shut down OpenSSL on exit.
-
- static void initialize();
+
+ POCO_CRYPT_NO_SANITIZE_THREAD static void initialize();
/// Initializes the OpenSSL machinery.
- static void uninitialize();
+ POCO_CRYPT_NO_SANITIZE_THREAD static void uninitialize();
/// Shuts down the OpenSSL machinery.
static bool isFIPSEnabled();
@@ -71,7 +81,7 @@ protected:
{
SEEDSIZE = 256
};
-
+
// OpenSSL multithreading support
static void lock(int mode, int n, const char* file, int line);
static unsigned long id();
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 181a3da9c5..aa17898a66 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.18.15
+Version: 3.18.16
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index 1da9bb152c..8b10140b34 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.18.15)
+VERSION(3.18.16)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
index 972003989c..d1872f4245 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
@@ -418,12 +418,14 @@ class StreamReadMessage:
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
+ auto_partitioning_support: bool
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
+ res.auto_partitioning_support = self.auto_partitioning_support
return res
@dataclass
@@ -696,6 +698,20 @@ class StreamReadMessage:
)
@dataclass
+ class EndPartitionSession(IFromProto):
+ partition_session_id: int
+ adjacent_partition_ids: List[int]
+ child_partition_ids: List[int]
+
+ @staticmethod
+ def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession):
+ return StreamReadMessage.EndPartitionSession(
+ partition_session_id=msg.partition_session_id,
+ adjacent_partition_ids=list(msg.adjacent_partition_ids),
+ child_partition_ids=list(msg.child_partition_ids),
+ )
+
+ @dataclass
class FromClient(IToProto):
client_message: "ReaderMessagesFromClientToServer"
@@ -774,6 +790,13 @@ class StreamReadMessage:
msg.partition_session_status_response
),
)
+ elif mess_type == "end_partition_session":
+ return StreamReadMessage.FromServer(
+ server_status=server_status,
+ server_message=StreamReadMessage.EndPartitionSession.from_proto(
+ msg.end_partition_session,
+ ),
+ )
else:
raise issues.UnexpectedGrpcMessage(
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
@@ -798,6 +821,7 @@ ReaderMessagesFromServerToClient = Union[
UpdateTokenResponse,
StreamReadMessage.StartPartitionSessionRequest,
StreamReadMessage.StopPartitionSessionRequest,
+ StreamReadMessage.EndPartitionSession,
]
@@ -942,18 +966,130 @@ class AlterConsumer(IToProto, IFromPublic):
class PartitioningSettings(IToProto, IFromProto):
min_active_partitions: int
partition_count_limit: int
+ max_active_partitions: int
+ auto_partitioning_settings: AutoPartitioningSettings
@staticmethod
def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings":
return PartitioningSettings(
min_active_partitions=msg.min_active_partitions,
partition_count_limit=msg.partition_count_limit,
+ max_active_partitions=msg.max_active_partitions,
+ auto_partitioning_settings=AutoPartitioningSettings.from_proto(msg.auto_partitioning_settings),
)
def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
+ auto_partitioning_settings = None
+ if self.auto_partitioning_settings is not None:
+ auto_partitioning_settings = self.auto_partitioning_settings.to_proto()
+
return ydb_topic_pb2.PartitioningSettings(
min_active_partitions=self.min_active_partitions,
partition_count_limit=self.partition_count_limit,
+ max_active_partitions=self.max_active_partitions,
+ auto_partitioning_settings=auto_partitioning_settings,
+ )
+
+
+class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic):
+ UNSPECIFIED = 0
+ DISABLED = 1
+ SCALE_UP = 2
+ SCALE_UP_AND_DOWN = 3
+ PAUSED = 4
+
+ @staticmethod
+ def from_public(
+ strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy],
+ ) -> Optional["AutoPartitioningStrategy"]:
+ if strategy is None:
+ return None
+
+ return AutoPartitioningStrategy(strategy)
+
+ @staticmethod
+ def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]:
+ if code is None:
+ return None
+
+ return AutoPartitioningStrategy(code)
+
+ def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy:
+ try:
+ return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
+ except KeyError:
+ return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED
+
+
+@dataclass
+class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic):
+ strategy: AutoPartitioningStrategy
+ partition_write_speed: AutoPartitioningWriteSpeedStrategy
+
+ @staticmethod
+ def from_public(
+ settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings],
+ ) -> Optional[AutoPartitioningSettings]:
+ if not settings:
+ return None
+
+ return AutoPartitioningSettings(
+ strategy=settings.strategy,
+ partition_write_speed=AutoPartitioningWriteSpeedStrategy(
+ stabilization_window=settings.stabilization_window,
+ up_utilization_percent=settings.up_utilization_percent,
+ down_utilization_percent=settings.down_utilization_percent,
+ ),
+ )
+
+ @staticmethod
+ def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings:
+ if msg is None:
+ return None
+
+ return AutoPartitioningSettings(
+ strategy=AutoPartitioningStrategy.from_proto(msg.strategy),
+ partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed),
+ )
+
+ def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings:
+ return ydb_topic_pb2.AutoPartitioningSettings(
+ strategy=self.strategy, partition_write_speed=self.partition_write_speed.to_proto()
+ )
+
+ def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings:
+ return ydb_topic_public_types.PublicAutoPartitioningSettings(
+ strategy=self.strategy.to_public(),
+ stabilization_window=self.partition_write_speed.stabilization_window,
+ up_utilization_percent=self.partition_write_speed.up_utilization_percent,
+ down_utilization_percent=self.partition_write_speed.down_utilization_percent,
+ )
+
+
+@dataclass
+class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto):
+ stabilization_window: Optional[datetime.timedelta]
+ up_utilization_percent: Optional[int]
+ down_utilization_percent: Optional[int]
+
+ def to_proto(self):
+ return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy(
+ stabilization_window=proto_duration_from_timedelta(self.stabilization_window),
+ up_utilization_percent=self.up_utilization_percent,
+ down_utilization_percent=self.down_utilization_percent,
+ )
+
+ @staticmethod
+ def from_proto(
+ msg: Optional[ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy],
+ ) -> Optional[AutoPartitioningWriteSpeedStrategy]:
+ if msg is None:
+ return None
+
+ return AutoPartitioningWriteSpeedStrategy(
+ stabilization_window=timedelta_from_proto_duration(msg.stabilization_window),
+ up_utilization_percent=msg.up_utilization_percent,
+ down_utilization_percent=msg.down_utilization_percent,
)
@@ -961,11 +1097,65 @@ class PartitioningSettings(IToProto, IFromProto):
class AlterPartitioningSettings(IToProto):
set_min_active_partitions: Optional[int]
set_partition_count_limit: Optional[int]
+ set_max_active_partitions: Optional[int]
+ alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings]
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
+ alter_auto_partitioning_settings = None
+ if self.alter_auto_partitioning_settings is not None:
+ alter_auto_partitioning_settings = self.alter_auto_partitioning_settings.to_proto()
+
return ydb_topic_pb2.AlterPartitioningSettings(
set_min_active_partitions=self.set_min_active_partitions,
set_partition_count_limit=self.set_partition_count_limit,
+ set_max_active_partitions=self.set_max_active_partitions,
+ alter_auto_partitioning_settings=alter_auto_partitioning_settings,
+ )
+
+
+@dataclass
+class AlterAutoPartitioningSettings(IToProto, IFromPublic):
+ set_strategy: Optional[AutoPartitioningStrategy]
+ set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy]
+
+ @staticmethod
+ def from_public(
+ settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings],
+ ) -> Optional[AlterAutoPartitioningSettings]:
+ if not settings:
+ return None
+
+ return AlterAutoPartitioningSettings(
+ set_strategy=settings.set_strategy,
+ set_partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy(
+ set_stabilization_window=settings.set_stabilization_window,
+ set_up_utilization_percent=settings.set_up_utilization_percent,
+ set_down_utilization_percent=settings.set_down_utilization_percent,
+ ),
+ )
+
+ def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings:
+ set_partition_write_speed = None
+ if self.set_partition_write_speed:
+ set_partition_write_speed = self.set_partition_write_speed.to_proto()
+
+ return ydb_topic_pb2.AlterAutoPartitioningSettings(
+ set_strategy=self.set_strategy,
+ set_partition_write_speed=set_partition_write_speed,
+ )
+
+
+@dataclass
+class AlterAutoPartitioningWriteSpeedStrategy(IToProto):
+ set_stabilization_window: Optional[datetime.timedelta]
+ set_up_utilization_percent: Optional[int]
+ set_down_utilization_percent: Optional[int]
+
+ def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy:
+ return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy(
+ set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window),
+ set_up_utilization_percent=self.set_up_utilization_percent,
+ set_down_utilization_percent=self.set_down_utilization_percent,
)
@@ -992,7 +1182,7 @@ class MeteringMode(int, IFromProto, IFromPublic, IToPublic):
def to_public(self) -> ydb_topic_public_types.PublicMeteringMode:
try:
- ydb_topic_public_types.PublicMeteringMode(int(self))
+ return ydb_topic_public_types.PublicMeteringMode(int(self))
except KeyError:
return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED
@@ -1011,9 +1201,13 @@ class CreateTopicRequest(IToProto, IFromPublic):
metering_mode: "MeteringMode"
def to_proto(self) -> ydb_topic_pb2.CreateTopicRequest:
+ partitioning_settings = None
+ if self.partitioning_settings is not None:
+ partitioning_settings = self.partitioning_settings.to_proto()
+
return ydb_topic_pb2.CreateTopicRequest(
path=self.path,
- partitioning_settings=self.partitioning_settings.to_proto(),
+ partitioning_settings=partitioning_settings,
retention_period=proto_duration_from_timedelta(self.retention_period),
retention_storage_mb=self.retention_storage_mb,
supported_codecs=self.supported_codecs.to_proto(),
@@ -1038,11 +1232,17 @@ class CreateTopicRequest(IToProto, IFromPublic):
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
consumers.append(Consumer.from_public(consumer))
+ auto_partitioning_settings = None
+ if req.auto_partitioning_settings is not None:
+ auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings)
+
return CreateTopicRequest(
path=req.path,
partitioning_settings=PartitioningSettings(
min_active_partitions=req.min_active_partitions,
partition_count_limit=req.partition_count_limit,
+ max_active_partitions=req.max_active_partitions,
+ auto_partitioning_settings=auto_partitioning_settings,
),
retention_period=req.retention_period,
retention_storage_mb=req.retention_storage_mb,
@@ -1113,6 +1313,12 @@ class AlterTopicRequest(IToProto, IFromPublic):
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
alter_consumers.append(AlterConsumer.from_public(consumer))
+ alter_auto_partitioning_settings = None
+ if req.alter_auto_partitioning_settings is not None:
+ alter_auto_partitioning_settings = AlterAutoPartitioningSettings.from_public(
+ req.alter_auto_partitioning_settings
+ )
+
drop_consumers = req.drop_consumers if req.drop_consumers else []
return AlterTopicRequest(
@@ -1120,6 +1326,8 @@ class AlterTopicRequest(IToProto, IFromPublic):
alter_partitioning_settings=AlterPartitioningSettings(
set_min_active_partitions=req.set_min_active_partitions,
set_partition_count_limit=req.set_partition_count_limit,
+ set_max_active_partitions=req.set_max_active_partitions,
+ alter_auto_partitioning_settings=alter_auto_partitioning_settings,
),
add_consumers=add_consumers,
set_retention_period=req.set_retention_period,
@@ -1180,6 +1388,8 @@ class DescribeTopicResult(IFromProtoWithProtoType, IToPublic):
return ydb_topic_public_types.PublicDescribeTopicResult(
self=scheme._wrap_scheme_entry(self.self_proto),
min_active_partitions=self.partitioning_settings.min_active_partitions,
+ max_active_partitions=self.partitioning_settings.max_active_partitions,
+ auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(),
partition_count_limit=self.partitioning_settings.partition_count_limit,
partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)),
retention_period=self.retention_period,
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
index 917dd53363..e3b118e9ca 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
@@ -18,6 +18,7 @@ from ...scheme import SchemeEntry
class CreateTopicRequestParams:
path: str
min_active_partitions: Optional[int]
+ max_active_partitions: Optional[int]
partition_count_limit: Optional[int]
retention_period: Optional[datetime.timedelta]
retention_storage_mb: Optional[int]
@@ -27,12 +28,14 @@ class CreateTopicRequestParams:
attributes: Optional[Dict[str, str]]
consumers: Optional[List[Union["PublicConsumer", str]]]
metering_mode: Optional["PublicMeteringMode"]
+ auto_partitioning_settings: Optional["PublicAutoPartitioningSettings"]
@dataclass
class AlterTopicRequestParams:
path: str
set_min_active_partitions: Optional[int]
+ set_max_active_partitions: Optional[int]
set_partition_count_limit: Optional[int]
add_consumers: Optional[List[Union["PublicConsumer", str]]]
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
@@ -44,6 +47,7 @@ class AlterTopicRequestParams:
set_retention_period: Optional[datetime.timedelta]
set_retention_storage_mb: Optional[int]
set_supported_codecs: Optional[List[Union["PublicCodec", int]]]
+ alter_auto_partitioning_settings: Optional["PublicAlterAutoPartitioningSettings"]
class PublicCodec(int):
@@ -67,6 +71,30 @@ class PublicMeteringMode(IntEnum):
REQUEST_UNITS = 2
+class PublicAutoPartitioningStrategy(IntEnum):
+ UNSPECIFIED = 0
+ DISABLED = 1
+ SCALE_UP = 2
+ SCALE_UP_AND_DOWN = 3
+ PAUSED = 4
+
+
+@dataclass
+class PublicAutoPartitioningSettings:
+ strategy: Optional["PublicAutoPartitioningStrategy"] = None
+ stabilization_window: Optional[datetime.timedelta] = None
+ down_utilization_percent: Optional[int] = None
+ up_utilization_percent: Optional[int] = None
+
+
+@dataclass
+class PublicAlterAutoPartitioningSettings:
+ set_strategy: Optional["PublicAutoPartitioningStrategy"] = None
+ set_stabilization_window: Optional[datetime.timedelta] = None
+ set_down_utilization_percent: Optional[int] = None
+ set_up_utilization_percent: Optional[int] = None
+
+
@dataclass
class PublicConsumer:
name: str
@@ -137,6 +165,9 @@ class PublicDescribeTopicResult:
min_active_partitions: int
"Minimum partition count auto merge would stop working at"
+ max_active_partitions: int
+ "Minimum partition count auto split would stop working at"
+
partition_count_limit: int
"Limit for total partition count, including active (open for write) and read-only partitions"
@@ -170,6 +201,8 @@ class PublicDescribeTopicResult:
topic_stats: "PublicDescribeTopicResult.TopicStats"
"Statistics of topic"
+ auto_partitioning_settings: "PublicAutoPartitioningSettings"
+
@dataclass
class PartitionInfo:
partition_id: int
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
index a9c811ac4f..b48501aff2 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
@@ -121,6 +121,16 @@ class PartitionSession:
def closed(self):
return self.state == PartitionSession.State.Stopped
+ def end(self):
+ if self.closed:
+ return
+
+ self.state = PartitionSession.State.Ended
+
+ @property
+ def ended(self):
+ return self.state == PartitionSession.State.Ended
+
def _ensure_not_closed(self):
if self.state == PartitionSession.State.Stopped:
raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
@@ -129,6 +139,7 @@ class PartitionSession:
Active = 1
GracefulShutdown = 2
Stopped = 3
+ Ended = 4
@dataclass(order=True)
class CommitAckWaiter:
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
index b907ee2794..8bc12cc0d8 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
@@ -45,6 +45,7 @@ class PublicReaderSettings:
consumer: str
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
+ auto_partitioning_support: bool = True
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
"""decoders: map[codec_code] func(encoded_bytes)->decoded_bytes"""
@@ -77,6 +78,7 @@ class PublicReaderSettings:
return StreamReadMessage.InitRequest(
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
consumer=self.consumer,
+ auto_partitioning_support=self.auto_partitioning_support,
)
def _retry_settings(self) -> RetrySettings:
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index e407fe01da..7061b4e449 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -388,6 +388,14 @@ class ReaderStream:
partition_session_id, batch = self._message_batches.popitem(last=False)
return partition_session_id, batch
+ def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch):
+ self._message_batches[part_sess_id] = batch
+
+ # In case of auto-split we should return all parent messages ASAP
+ # without queue rotation to prevent child's messages before parent's.
+ if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended:
+ self._message_batches.move_to_end(part_sess_id, last=False)
+
def receive_batch_nowait(self, max_messages: Optional[int] = None):
if self._get_first_error():
raise self._get_first_error()
@@ -403,7 +411,8 @@ class ReaderStream:
cutted_batch = batch._pop_batch(message_count=max_messages)
- self._message_batches[part_sess_id] = batch
+ self._return_batch_to_queue(part_sess_id, batch)
+
self._buffer_release_bytes(cutted_batch._bytes_size)
return cutted_batch
@@ -423,7 +432,7 @@ class ReaderStream:
self._buffer_release_bytes(batch._bytes_size)
else:
# TODO: we should somehow release bytes from single message as well
- self._message_batches[part_sess_id] = batch
+ self._return_batch_to_queue(part_sess_id, batch)
return message
@@ -498,6 +507,12 @@ class ReaderStream:
):
self._on_partition_session_stop(message.server_message)
+ elif isinstance(
+ message.server_message,
+ StreamReadMessage.EndPartitionSession,
+ ):
+ self._on_end_partition_session(message.server_message)
+
elif isinstance(message.server_message, UpdateTokenResponse):
self._update_token_event.set()
@@ -575,6 +590,16 @@ class ReaderStream:
)
)
+ def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
+ logger.debug(
+ f"End partition session with id: {message.partition_session_id}, "
+ f"child partitions: {message.child_partition_ids}"
+ )
+
+ if message.partition_session_id in self._partition_sessions:
+ # Mark partition session as ended not to shuffle messages.
+ self._partition_sessions[message.partition_session_id].end()
+
def _on_read_response(self, message: StreamReadMessage.ReadResponse):
self._buffer_consume_bytes(message.bytes_size)
diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py
index 456896dbb5..f6a84eb0b1 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py
@@ -13,9 +13,11 @@ from ...retries import (
RetrySettings,
retry_operation_async,
)
+from ...query.base import BaseQueryTxMode
from ...query.base import QueryClientSettings
from ... import convert
from ..._grpc.grpcwrapper import common_utils
+from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
logger = logging.getLogger(__name__)
@@ -122,6 +124,39 @@ class QuerySessionPool:
return await retry_operation_async(wrapped_callee, retry_settings)
+ async def retry_tx_async(
+ self,
+ callee: Callable,
+ tx_mode: Optional[BaseQueryTxMode] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
+ ):
+ """Special interface to execute a bunch of commands with transaction in a safe, retriable way.
+
+ :param callee: A function, that works with session.
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ async def wrapped_callee():
+ async with self.checkout() as session:
+ async with session.transaction(tx_mode=tx_mode) as tx:
+ result = await callee(tx, *args, **kwargs)
+ await tx.commit()
+ return result
+
+ return await retry_operation_async(wrapped_callee, retry_settings)
+
async def execute_with_retries(
self,
query: str,
diff --git a/contrib/python/ydb/py3/ydb/import_client.py b/contrib/python/ydb/py3/ydb/import_client.py
index 830f10c5bb..9a01e5a508 100644
--- a/contrib/python/ydb/py3/ydb/import_client.py
+++ b/contrib/python/ydb/py3/ydb/import_client.py
@@ -32,7 +32,10 @@ class ImportProgress(enum.IntEnum):
def _initialize_progresses():
for key, value in ydb_import_pb2.ImportProgress.Progress.items():
- _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :])
+ try:
+ _progresses[value] = getattr(ImportProgress, key[len("PROGRESS_") :])
+ except AttributeError:
+ pass
_initialize_progresses()
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index f1fcd17360..e3775c4dd1 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -8,6 +8,7 @@ import time
import threading
import queue
+from .base import BaseQueryTxMode
from .base import QueryClientSettings
from .session import (
QuerySession,
@@ -20,6 +21,7 @@ from .. import issues
from .. import convert
from ..settings import BaseRequestSettings
from .._grpc.grpcwrapper import common_utils
+from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
logger = logging.getLogger(__name__)
@@ -138,6 +140,39 @@ class QuerySessionPool:
return retry_operation_sync(wrapped_callee, retry_settings)
+ def retry_tx_sync(
+ self,
+ callee: Callable,
+ tx_mode: Optional[BaseQueryTxMode] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
+ ):
+ """Special interface to execute a bunch of commands with transaction in a safe, retriable way.
+
+ :param callee: A function, that works with session.
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ def wrapped_callee():
+ with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
+ with session.transaction(tx_mode=tx_mode) as tx:
+ result = callee(tx, *args, **kwargs)
+ tx.commit()
+ return result
+
+ return retry_operation_sync(wrapped_callee, retry_settings)
+
def execute_with_retries(
self,
query: str,
diff --git a/contrib/python/ydb/py3/ydb/scheme.py b/contrib/python/ydb/py3/ydb/scheme.py
index 04951b5eae..263d1c65d3 100644
--- a/contrib/python/ydb/py3/ydb/scheme.py
+++ b/contrib/python/ydb/py3/ydb/scheme.py
@@ -24,6 +24,10 @@ class SchemeEntryType(enum.IntEnum):
SEQUENCE = 15
REPLICATION = 16
TOPIC = 17
+ EXTERNAL_TABLE = 18
+ EXTERNAL_DATA_SOURCE = 19
+ VIEW = 20
+ RESOURCE_POOL = 21
@classmethod
def _missing_(cls, value):
@@ -103,6 +107,38 @@ class SchemeEntryType(enum.IntEnum):
"""
return entry == SchemeEntryType.DATABASE or entry == SchemeEntryType.DIRECTORY
+ @staticmethod
+ def is_external_table(entry):
+ """
+ :param entry: A scheme entry to check
+ :return: True if scheme entry is an external table and False otherwise
+ """
+ return entry == SchemeEntryType.EXTERNAL_TABLE
+
+ @staticmethod
+ def is_external_data_source(entry):
+ """
+ :param entry: A scheme entry to check
+ :return: True if scheme entry is an external data source and False otherwise
+ """
+ return entry == SchemeEntryType.EXTERNAL_DATA_SOURCE
+
+ @staticmethod
+ def is_external_view(entry):
+ """
+ :param entry: A scheme entry to check
+ :return: True if scheme entry is a view and False otherwise
+ """
+ return entry == SchemeEntryType.VIEW
+
+ @staticmethod
+ def is_external_resource_pool(entry):
+ """
+ :param entry: A scheme entry to check
+ :return: True if scheme entry is a resource pool and False otherwise
+ """
+ return entry == SchemeEntryType.RESOURCE_POOL
+
class SchemeEntry(object):
__slots__ = (
@@ -185,6 +221,30 @@ class SchemeEntry(object):
"""
return SchemeEntryType.is_coordination_node(self.type)
+ def is_external_table(self):
+ """
+ :return: True if scheme entry is an external table and False otherwise
+ """
+ return SchemeEntryType.is_external_table(self.type)
+
+ def is_external_data_source(self):
+ """
+ :return: True if scheme entry is an external data source and False otherwise
+ """
+ return SchemeEntryType.is_external_data_source(self.type)
+
+ def is_view(self):
+ """
+ :return: True if scheme entry is a view and False otherwise
+ """
+ return SchemeEntryType.is_view(self.type)
+
+ def is_resource_pool(self):
+ """
+ :return: True if scheme entry is a resource pool and False otherwise
+ """
+ return SchemeEntryType.is_resource_pool(self.type)
+
class Directory(SchemeEntry):
__slots__ = ("children",)
diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py
index f0b872e297..55f4ea04c5 100644
--- a/contrib/python/ydb/py3/ydb/topic.py
+++ b/contrib/python/ydb/py3/ydb/topic.py
@@ -7,6 +7,9 @@ __all__ = [
"TopicCodec",
"TopicConsumer",
"TopicAlterConsumer",
+ "TopicAlterAutoPartitioningSettings",
+ "TopicAutoPartitioningSettings",
+ "TopicAutoPartitioningStrategy",
"TopicDescription",
"TopicError",
"TopicMeteringMode",
@@ -80,6 +83,9 @@ from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401
PublicConsumer as TopicConsumer,
PublicAlterConsumer as TopicAlterConsumer,
PublicMeteringMode as TopicMeteringMode,
+ PublicAutoPartitioningStrategy as TopicAutoPartitioningStrategy,
+ PublicAutoPartitioningSettings as TopicAutoPartitioningSettings,
+ PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings,
)
@@ -108,6 +114,7 @@ class TopicClientAsyncIO:
self,
path: str,
min_active_partitions: Optional[int] = None,
+ max_active_partitions: Optional[int] = None,
partition_count_limit: Optional[int] = None,
retention_period: Optional[datetime.timedelta] = None,
retention_storage_mb: Optional[int] = None,
@@ -117,6 +124,7 @@ class TopicClientAsyncIO:
attributes: Optional[Dict[str, str]] = None,
consumers: Optional[List[Union[TopicConsumer, str]]] = None,
metering_mode: Optional[TopicMeteringMode] = None,
+ auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None,
):
"""
create topic command
@@ -151,6 +159,7 @@ class TopicClientAsyncIO:
self,
path: str,
set_min_active_partitions: Optional[int] = None,
+ set_max_active_partitions: Optional[int] = None,
set_partition_count_limit: Optional[int] = None,
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
@@ -162,6 +171,7 @@ class TopicClientAsyncIO:
set_retention_period: Optional[datetime.timedelta] = None,
set_retention_storage_mb: Optional[int] = None,
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
+ alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None,
):
"""
alter topic command
@@ -226,6 +236,7 @@ class TopicClientAsyncIO:
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None,
+ auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
) -> TopicReaderAsyncIO:
if not decoder_executor:
@@ -305,6 +316,7 @@ class TopicClient:
self,
path: str,
min_active_partitions: Optional[int] = None,
+ max_active_partitions: Optional[int] = None,
partition_count_limit: Optional[int] = None,
retention_period: Optional[datetime.timedelta] = None,
retention_storage_mb: Optional[int] = None,
@@ -314,6 +326,7 @@ class TopicClient:
attributes: Optional[Dict[str, str]] = None,
consumers: Optional[List[Union[TopicConsumer, str]]] = None,
metering_mode: Optional[TopicMeteringMode] = None,
+ auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None,
):
"""
create topic command
@@ -350,6 +363,7 @@ class TopicClient:
self,
path: str,
set_min_active_partitions: Optional[int] = None,
+ set_max_active_partitions: Optional[int] = None,
set_partition_count_limit: Optional[int] = None,
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
@@ -361,6 +375,7 @@ class TopicClient:
set_retention_period: Optional[datetime.timedelta] = None,
set_retention_storage_mb: Optional[int] = None,
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
+ alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None,
):
"""
alter topic command
@@ -431,6 +446,7 @@ class TopicClient:
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
+ auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
) -> TopicReader:
if not decoder_executor:
decoder_executor = self._executor
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index bdc80c2111..750aee1df8 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.18.15"
+VERSION = "3.18.16"