summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-04-25 10:39:58 +0300
committerrobot-piglet <[email protected]>2025-04-25 11:07:55 +0300
commit7f0039151eb08c3d9e241b29eb43889da16b0162 (patch)
tree7a7185b8496d3bef4cc8134ec869b1c163b30696 /contrib/python
parenta64165ec06c774a4d2815a7ccf8c7fd66921a2c1 (diff)
Intermediate changes
commit_hash:9feae42eea92837e800e65cbcf9acc15f580e438
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make3
-rw-r--r--contrib/python/ydb/py3/ydb/_apis.py1
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py21
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py4
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/events.py81
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py26
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py30
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py13
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/pool.py2
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/session.py13
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/transaction.py16
-rw-r--r--contrib/python/ydb/py3/ydb/iam/auth.py6
-rw-r--r--contrib/python/ydb/py3/ydb/query/__init__.py2
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py13
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py2
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py31
-rw-r--r--contrib/python/ydb/py3/ydb/query/transaction.py31
-rw-r--r--contrib/python/ydb/py3/ydb/topic.py78
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
20 files changed, 318 insertions, 59 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 904414722ef..8a15b64dc4b 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.20.1
+Version: 3.21.0
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index fbc5d148f8a..b938db3247b 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.20.1)
+VERSION(3.21.0)
LICENSE(Apache-2.0)
@@ -40,6 +40,7 @@ PY_SRCS(
ydb/_topic_common/common.py
ydb/_topic_reader/__init__.py
ydb/_topic_reader/datatypes.py
+ ydb/_topic_reader/events.py
ydb/_topic_reader/topic_reader.py
ydb/_topic_reader/topic_reader_asyncio.py
ydb/_topic_reader/topic_reader_sync.py
diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py
index fc6f16e287c..b0fa8f3cb22 100644
--- a/contrib/python/ydb/py3/ydb/_apis.py
+++ b/contrib/python/ydb/py3/ydb/_apis.py
@@ -116,6 +116,7 @@ class TopicService(object):
StreamRead = "StreamRead"
StreamWrite = "StreamWrite"
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
+ CommitOffset = "CommitOffset"
class QueryService(object):
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
index 0f8a0f03a7a..6b3594b8da4 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
@@ -136,6 +136,22 @@ class UpdateTokenResponse(IFromProto):
return UpdateTokenResponse()
+@dataclass
+class CommitOffsetRequest(IToProto):
+ path: str
+ consumer: str
+ partition_id: int
+ offset: int
+
+ def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
+ return ydb_topic_pb2.CommitOffsetRequest(
+ path=self.path,
+ consumer=self.consumer,
+ partition_id=self.partition_id,
+ offset=self.offset,
+ )
+
+
########################################################################################################################
# StreamWrite
########################################################################################################################
@@ -438,12 +454,13 @@ class StreamReadMessage:
@dataclass
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
- consumer: str
+ consumer: Optional[str]
auto_partitioning_support: bool
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
- res.consumer = self.consumer
+ if self.consumer is not None:
+ res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
index 74f06a086fc..737fa4149e2 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
@@ -56,6 +56,10 @@ class PublicMessage(ICommittable, ISessionAlive):
def alive(self) -> bool:
return not self._partition_session.closed
+ @property
+ def partition_id(self) -> int:
+ return self._partition_session.partition_id
+
@dataclass
class PartitionSession:
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/events.py b/contrib/python/ydb/py3/ydb/_topic_reader/events.py
new file mode 100644
index 00000000000..b229713c957
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/events.py
@@ -0,0 +1,81 @@
+import asyncio
+from dataclasses import dataclass
+from typing import Awaitable, Union
+
+from ..issues import ClientInternalError
+
+__all__ = [
+ "OnCommit",
+ "OnPartitionGetStartOffsetRequest",
+ "OnPartitionGetStartOffsetResponse",
+ "OnInitPartition",
+ "OnShutdownPartition",
+ "EventHandler",
+]
+
+
+class BaseReaderEvent:
+ pass
+
+
+@dataclass
+class OnCommit(BaseReaderEvent):
+ topic: str
+ offset: int
+
+
+@dataclass
+class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
+ topic: str
+ partition_id: int
+
+
+@dataclass
+class OnPartitionGetStartOffsetResponse:
+ start_offset: int
+
+
+class OnInitPartition(BaseReaderEvent):
+ pass
+
+
+class OnShutdownPartition:
+ pass
+
+
+TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]
+
+
+class EventHandler:
+ def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
+ pass
+
+ def on_partition_get_start_offset(
+ self,
+ event: OnPartitionGetStartOffsetRequest,
+ ) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
+ pass
+
+ def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
+ pass
+
+ def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
+ pass
+
+ async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
+ f = None
+ if isinstance(event, OnCommit):
+ f = self.on_commit
+ elif isinstance(event, OnPartitionGetStartOffsetRequest):
+ f = self.on_partition_get_start_offset
+ elif isinstance(event, OnInitPartition):
+ f = self.on_init_partition
+ elif isinstance(event, OnShutdownPartition):
+ f = self.on_shutdown_partition
+ else:
+ raise ClientInternalError("Unsupported topic reader event")
+
+ if asyncio.iscoroutinefunction(f):
+ return await f(event)
+
+ return f(event)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
index 8bc12cc0d87..d477c9ca1bb 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
@@ -10,6 +10,7 @@ from typing import (
Callable,
)
+from .events import EventHandler
from ..retries import RetrySettings
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
@@ -20,6 +21,7 @@ class PublicTopicSelector:
partitions: Optional[Union[int, List[int]]] = None
read_from: Optional[datetime.datetime] = None
max_lag: Optional[datetime.timedelta] = None
+ read_offset: Optional[int] = None
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
partitions = self.partitions
@@ -42,7 +44,7 @@ TopicSelectorTypes = Union[str, PublicTopicSelector, List[Union[str, PublicTopic
@dataclass
class PublicReaderSettings:
- consumer: str
+ consumer: Optional[str]
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
auto_partitioning_support: bool = True
@@ -53,13 +55,14 @@ class PublicReaderSettings:
# decoder_executor, must be set for handle non raw messages
decoder_executor: Optional[concurrent.futures.Executor] = None
update_token_interval: Union[int, float] = 3600
+ event_handler: Optional[EventHandler] = None
def __post_init__(self):
# check possible create init message
_ = self._init_message()
def _init_message(self) -> StreamReadMessage.InitRequest:
- if not isinstance(self.consumer, str):
+ if self.consumer is not None and not isinstance(self.consumer, str):
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))
if isinstance(self.topic, list):
@@ -85,25 +88,6 @@ class PublicReaderSettings:
return RetrySettings(idempotent=True)
-class Events:
- class OnCommit:
- topic: str
- offset: int
-
- class OnPartitionGetStartOffsetRequest:
- topic: str
- partition_id: int
-
- class OnPartitionGetStartOffsetResponse:
- start_offset: int
-
- class OnInitPartition:
- pass
-
- class OnShutdownPatition:
- pass
-
-
class RetryPolicy:
connection_timeout_sec: float
overload_timeout_sec: float
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index 87012554ef5..34c52108e17 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -15,6 +15,7 @@ from .._utilities import AtomicCounter
from ..aio import Driver
from ..issues import Error as YdbError, _process_response
from . import datatypes
+from . import events
from . import topic_reader
from .._grpc.grpcwrapper.common_utils import (
IGrpcWrapperAsyncIO,
@@ -72,6 +73,7 @@ class TopicReaderClosedError(TopicReaderError):
class PublicAsyncIOReader:
_loop: asyncio.AbstractEventLoop
_closed: bool
+ _settings: topic_reader.PublicReaderSettings
_reconnector: ReaderReconnector
_parent: typing.Any # need for prevent close parent client by GC
@@ -84,6 +86,7 @@ class PublicAsyncIOReader:
):
self._loop = asyncio.get_running_loop()
self._closed = False
+ self._settings = settings
self._reconnector = ReaderReconnector(driver, settings, self._loop)
self._parent = _parent
@@ -156,6 +159,9 @@ class PublicAsyncIOReader:
For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again).
"""
+ if self._settings.consumer is None:
+ raise issues.Error("Commit operations are not supported for topic reader without consumer.")
+
try:
self._reconnector.commit(batch)
except PublicTopicReaderPartitionExpiredError:
@@ -171,6 +177,9 @@ class PublicAsyncIOReader:
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
to this or other reader).
"""
+ if self._settings.consumer is None:
+ raise issues.Error("Commit operations are not supported for topic reader without consumer.")
+
waiter = self._reconnector.commit(batch)
await waiter.future
@@ -393,6 +402,7 @@ class ReaderStream:
_update_token_interval: Union[int, float]
_update_token_event: asyncio.Event
_get_token_function: Callable[[], str]
+ _settings: topic_reader.PublicReaderSettings
def __init__(
self,
@@ -425,6 +435,8 @@ class ReaderStream:
self._get_token_function = get_token_function
self._update_token_event = asyncio.Event()
+ self._settings = settings
+
@staticmethod
async def create(
reader_reconnector_id: int,
@@ -615,7 +627,7 @@ class ReaderStream:
message.server_message,
StreamReadMessage.StartPartitionSessionRequest,
):
- self._on_start_partition_session(message.server_message)
+ await self._on_start_partition_session(message.server_message)
elif isinstance(
message.server_message,
@@ -660,7 +672,7 @@ class ReaderStream:
finally:
self._update_token_event.clear()
- def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
+ async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
try:
if message.partition_session.partition_session_id in self._partition_sessions:
raise TopicReaderError(
@@ -676,11 +688,23 @@ class ReaderStream:
reader_reconnector_id=self._reader_reconnector_id,
reader_stream_id=self._id,
)
+
+ read_offset = None
+
+ if self._settings.event_handler is not None:
+ resp = await self._settings.event_handler._dispatch(
+ events.OnPartitionGetStartOffsetRequest(
+ message.partition_session.path,
+ message.partition_session.partition_id,
+ )
+ )
+ read_offset = None if resp is None else resp.start_offset
+
self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.StartPartitionSessionResponse(
partition_session_id=message.partition_session.partition_session_id,
- read_offset=None,
+ read_offset=read_offset,
commit_offset=None,
)
),
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
index 31f28899271..bb2fc2a34b0 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
@@ -4,6 +4,7 @@ import logging
import typing
from typing import List, Union, Optional
+from ydb import issues
from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
from ydb._topic_common.common import (
_get_shared_event_loop,
@@ -31,6 +32,7 @@ class TopicReaderSync:
_caller: CallFromSyncToAsync
_async_reader: PublicAsyncIOReader
_closed: bool
+ _settings: PublicReaderSettings
_parent: typing.Any # need for prevent stop the client by GC
def __init__(
@@ -55,6 +57,8 @@ class TopicReaderSync:
self._async_reader = asyncio.run_coroutine_threadsafe(create_reader(), loop).result()
+ self._settings = settings
+
self._parent = _parent
def __del__(self):
@@ -154,6 +158,9 @@ class TopicReaderSync:
"""
self._check_closed()
+ if self._settings.consumer is None:
+ raise issues.Error("Commit operations are not supported for topic reader without consumer.")
+
self._caller.call_sync(lambda: self._async_reader.commit(mess))
def commit_with_ack(
@@ -168,6 +175,9 @@ class TopicReaderSync:
"""
self._check_closed()
+ if self._settings.consumer is None:
+ raise issues.Error("Commit operations are not supported for topic reader without consumer.")
+
return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout)
def async_commit_with_ack(
@@ -178,6 +188,9 @@ class TopicReaderSync:
"""
self._check_closed()
+ if self._settings.consumer is None:
+ raise issues.Error("Commit operations are not supported for topic reader without consumer.")
+
return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess))
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py
index f1ca68d1cf0..b691a1b111c 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py
@@ -142,7 +142,7 @@ class QuerySessionPool:
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.
:param callee: A function, that works with session.
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py
index 0561de8c391..fe857878a54 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/session.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/session.py
@@ -117,15 +117,22 @@ class QuerySession(BaseQuerySession):
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
+ *,
+ stats_mode: Optional[base.QueryStatsMode] = None,
) -> AsyncResponseContextIterator:
"""Sends a query to Query Service
:param query: (YQL or SQL text) to be executed.
- :param syntax: Syntax of the query, which is a one from the following choises:
+ :param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
:param parameters: dict with parameters and YDB types;
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+ :param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
+ 1) QueryStatsMode:NONE, which is default;
+ 2) QueryStatsMode.BASIC;
+ 3) QueryStatsMode.FULL;
+ 4) QueryStatsMode.PROFILE;
:return: Iterator with result sets
"""
@@ -133,10 +140,11 @@ class QuerySession(BaseQuerySession):
stream_it = await self._execute_call(
query=query,
+ parameters=parameters,
commit_tx=True,
syntax=syntax,
exec_mode=exec_mode,
- parameters=parameters,
+ stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
@@ -147,6 +155,7 @@ class QuerySession(BaseQuerySession):
rpc_state=None,
response_pb=resp,
session_state=self._state,
+ session=self,
settings=self._settings,
),
)
diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
index f0547e5f01f..c9a6e445c93 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
@@ -29,7 +29,7 @@ class QueryTxContext(BaseQueryTxContext):
:param driver: A driver instance
:param session_state: A state of session
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
@@ -142,21 +142,28 @@ class QueryTxContext(BaseQueryTxContext):
exec_mode: Optional[base.QueryExecMode] = None,
concurrent_result_sets: Optional[bool] = False,
settings: Optional[BaseRequestSettings] = None,
+ *,
+ stats_mode: Optional[base.QueryStatsMode] = None,
) -> AsyncResponseContextIterator:
"""Sends a query to Query Service
:param query: (YQL or SQL text) to be executed.
:param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
- :param syntax: Syntax of the query, which is a one from the following choises:
+ :param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
- :param exec_mode: Exec mode of the query, which is a one from the following choises:
+ :param exec_mode: Exec mode of the query, which is a one from the following choices:
1) QueryExecMode.EXECUTE, which is default;
2) QueryExecMode.EXPLAIN;
3) QueryExecMode.VALIDATE;
4) QueryExecMode.PARSE.
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+ :param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
+ 1) QueryStatsMode:NONE, which is default;
+ 2) QueryStatsMode.BASIC;
+ 3) QueryStatsMode.FULL;
+ 4) QueryStatsMode.PROFILE;
:return: Iterator with result sets
"""
@@ -164,10 +171,11 @@ class QueryTxContext(BaseQueryTxContext):
stream_it = await self._execute_call(
query=query,
+ parameters=parameters,
commit_tx=commit_tx,
syntax=syntax,
exec_mode=exec_mode,
- parameters=parameters,
+ stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
diff --git a/contrib/python/ydb/py3/ydb/iam/auth.py b/contrib/python/ydb/py3/ydb/iam/auth.py
index c4096c096bf..688dededfca 100644
--- a/contrib/python/ydb/py3/ydb/iam/auth.py
+++ b/contrib/python/ydb/py3/ydb/iam/auth.py
@@ -3,7 +3,7 @@ from ydb import credentials, tracing
import grpc
import time
import abc
-from datetime import datetime
+from datetime import datetime, timezone
import json
import os
@@ -43,8 +43,8 @@ YANDEX_CLOUD_JWT_ALGORITHM = "PS256"
def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout, algorithm, token_service_url, subject=None):
assert jwt is not None, "Install pyjwt library to use jwt tokens"
now = time.time()
- now_utc = datetime.utcfromtimestamp(now)
- exp_utc = datetime.utcfromtimestamp(now + jwt_expiration_timeout)
+ now_utc = datetime.fromtimestamp(now, timezone.utc)
+ exp_utc = datetime.fromtimestamp(now + jwt_expiration_timeout, timezone.utc)
payload = {
"iss": account_id,
"aud": token_service_url,
diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py
index 59dd799294b..76436f98353 100644
--- a/contrib/python/ydb/py3/ydb/query/__init__.py
+++ b/contrib/python/ydb/py3/ydb/query/__init__.py
@@ -7,6 +7,7 @@ __all__ = [
"QuerySessionPool",
"QueryClientSettings",
"QuerySession",
+ "QueryStatsMode",
"QueryTxContext",
]
@@ -14,6 +15,7 @@ import logging
from .base import (
QueryClientSettings,
+ QueryStatsMode,
)
from .session import QuerySession
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
index a5ebedd95b3..52a6312e3be 100644
--- a/contrib/python/ydb/py3/ydb/query/base.py
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -25,6 +25,7 @@ from ydb._grpc.grpcwrapper.common_utils import to_thread
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
+ from .session import BaseQuerySession
class QuerySyntax(enum.IntEnum):
@@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum):
EXECUTE = 50
-class StatsMode(enum.IntEnum):
+class QueryStatsMode(enum.IntEnum):
UNSPECIFIED = 0
NONE = 10
BASIC = 20
@@ -132,12 +133,13 @@ def create_execute_query_request(
tx_mode: Optional[BaseQueryTxMode],
syntax: Optional[QuerySyntax],
exec_mode: Optional[QueryExecMode],
+ stats_mode: Optional[QueryStatsMode],
parameters: Optional[dict],
concurrent_result_sets: Optional[bool],
) -> ydb_query.ExecuteQueryRequest:
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
- stats_mode = StatsMode.NONE # TODO: choise is not supported yet
+ stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode
tx_control = None
if not tx_id and not tx_mode:
@@ -189,6 +191,7 @@ def wrap_execute_query_response(
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
session_state: IQuerySessionState,
tx: Optional["BaseQueryTxContext"] = None,
+ session: Optional["BaseQuerySession"] = None,
commit_tx: Optional[bool] = False,
settings: Optional[QueryClientSettings] = None,
) -> convert.ResultSet:
@@ -198,6 +201,12 @@ def wrap_execute_query_response(
elif tx and response_pb.tx_meta and not tx.tx_id:
tx._move_to_beginned(response_pb.tx_meta.id)
+ if response_pb.HasField("exec_stats"):
+ if tx is not None:
+ tx._last_query_stats = response_pb.exec_stats
+ if session is not None:
+ session._last_query_stats = response_pb.exec_stats
+
if response_pb.HasField("result_set"):
return convert.ResultSet.from_message(response_pb.result_set, settings)
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index b25f7db855c..1cf95ac0d13 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -151,7 +151,7 @@ class QuerySessionPool:
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.
:param callee: A function, that works with session.
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
index 382c922d5e6..3cc6c13d4e9 100644
--- a/contrib/python/ydb/py3/ydb/query/session.py
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -147,6 +147,12 @@ class BaseQuerySession:
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
)
+ self._last_query_stats = None
+
+ @property
+ def last_query_stats(self):
+ return self._last_query_stats
+
def _get_client_settings(
self,
driver: common_utils.SupportedDriverType,
@@ -189,22 +195,26 @@ class BaseQuerySession:
def _execute_call(
self,
query: str,
+ parameters: dict = None,
commit_tx: bool = False,
syntax: base.QuerySyntax = None,
exec_mode: base.QueryExecMode = None,
- parameters: dict = None,
+ stats_mode: Optional[base.QueryStatsMode] = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
+ self._last_query_stats = None
+
request = base.create_execute_query_request(
query=query,
- session_id=self._state.session_id,
+ parameters=parameters,
commit_tx=commit_tx,
+ session_id=self._state.session_id,
tx_mode=None,
tx_id=None,
syntax=syntax,
exec_mode=exec_mode,
- parameters=parameters,
+ stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
)
@@ -293,7 +303,7 @@ class QuerySession(BaseQuerySession):
def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext:
"""Creates a transaction context manager with specified transaction mode.
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
@@ -321,15 +331,22 @@ class QuerySession(BaseQuerySession):
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
+ *,
+ stats_mode: Optional[base.QueryStatsMode] = None,
) -> base.SyncResponseContextIterator:
"""Sends a query to Query Service
:param query: (YQL or SQL text) to be executed.
- :param syntax: Syntax of the query, which is a one from the following choises:
+ :param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
:param parameters: dict with parameters and YDB types;
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+ :param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
+ 1) QueryStatsMode:NONE, which is default;
+ 2) QueryStatsMode.BASIC;
+ 3) QueryStatsMode.FULL;
+ 4) QueryStatsMode.PROFILE;
:return: Iterator with result sets
"""
@@ -337,10 +354,11 @@ class QuerySession(BaseQuerySession):
stream_it = self._execute_call(
query=query,
+ parameters=parameters,
commit_tx=True,
syntax=syntax,
exec_mode=exec_mode,
- parameters=parameters,
+ stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
@@ -351,6 +369,7 @@ class QuerySession(BaseQuerySession):
rpc_state=None,
response_pb=resp,
session_state=self._state,
+ session=self,
settings=self._settings,
),
)
diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py
index ae7642dbe21..008ac7c404f 100644
--- a/contrib/python/ydb/py3/ydb/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/query/transaction.py
@@ -197,7 +197,7 @@ class BaseQueryTxContext(base.CallbackHandler):
:param driver: A driver instance
:param session_state: A state of session
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
@@ -210,6 +210,7 @@ class BaseQueryTxContext(base.CallbackHandler):
self.session = session
self._prev_stream = None
self._external_error = None
+ self._last_query_stats = None
@property
def session_id(self) -> str:
@@ -229,6 +230,10 @@ class BaseQueryTxContext(base.CallbackHandler):
"""
return self._tx_state.tx_id
+ @property
+ def last_query_stats(self):
+ return self._last_query_stats
+
def _tx_identity(self) -> _ydb_topic.TransactionIdentity:
if not self.tx_id:
raise RuntimeError("Unable to get tx identity without started tx.")
@@ -283,25 +288,29 @@ class BaseQueryTxContext(base.CallbackHandler):
def _execute_call(
self,
query: str,
+ parameters: Optional[dict],
commit_tx: Optional[bool],
syntax: Optional[base.QuerySyntax],
exec_mode: Optional[base.QueryExecMode],
- parameters: Optional[dict],
+ stats_mode: Optional[base.QueryStatsMode],
concurrent_result_sets: Optional[bool],
settings: Optional[BaseRequestSettings],
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
self._tx_state._check_tx_ready_to_use()
self._check_external_error_set()
+ self._last_query_stats = None
+
request = base.create_execute_query_request(
query=query,
- session_id=self._session_state.session_id,
+ parameters=parameters,
commit_tx=commit_tx,
+ session_id=self._session_state.session_id,
tx_id=self._tx_state.tx_id,
tx_mode=self._tx_state.tx_mode,
syntax=syntax,
exec_mode=exec_mode,
- parameters=parameters,
+ stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
)
@@ -338,7 +347,7 @@ class QueryTxContext(BaseQueryTxContext):
:param driver: A driver instance
:param session_state: A state of session
- :param tx_mode: Transaction mode, which is a one from the following choises:
+ :param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
@@ -451,22 +460,29 @@ class QueryTxContext(BaseQueryTxContext):
exec_mode: Optional[base.QueryExecMode] = None,
concurrent_result_sets: Optional[bool] = False,
settings: Optional[BaseRequestSettings] = None,
+ *,
+ stats_mode: Optional[base.QueryStatsMode] = None,
) -> base.SyncResponseContextIterator:
"""Sends a query to Query Service
:param query: (YQL or SQL text) to be executed.
:param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
- :param syntax: Syntax of the query, which is a one from the following choises:
+ :param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
- :param exec_mode: Exec mode of the query, which is a one from the following choises:
+ :param exec_mode: Exec mode of the query, which is a one from the following choices:
1) QueryExecMode.EXECUTE, which is default;
2) QueryExecMode.EXPLAIN;
3) QueryExecMode.VALIDATE;
4) QueryExecMode.PARSE.
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
:param settings: An additional request settings BaseRequestSettings;
+ :param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
+ 1) QueryStatsMode:NONE, which is default;
+ 2) QueryStatsMode.BASIC;
+ 3) QueryStatsMode.FULL;
+ 4) QueryStatsMode.PROFILE;
:return: Iterator with result sets
"""
@@ -477,6 +493,7 @@ class QueryTxContext(BaseQueryTxContext):
commit_tx=commit_tx,
syntax=syntax,
exec_mode=exec_mode,
+ stats_mode=stats_mode,
parameters=parameters,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py
index a501f9d2750..ceb82efb5dc 100644
--- a/contrib/python/ydb/py3/ydb/topic.py
+++ b/contrib/python/ydb/py3/ydb/topic.py
@@ -16,6 +16,7 @@ __all__ = [
"TopicReader",
"TopicReaderAsyncIO",
"TopicReaderBatch",
+ "TopicReaderEvents",
"TopicReaderMessage",
"TopicReaderSelector",
"TopicReaderSettings",
@@ -42,6 +43,8 @@ from . import aio, Credentials, _apis, issues
from . import driver
+from ._topic_reader import events as TopicReaderEvents
+
from ._topic_reader.datatypes import (
PublicBatch as TopicReaderBatch,
PublicMessage as TopicReaderMessage,
@@ -52,7 +55,9 @@ from ._topic_reader.topic_reader import (
PublicTopicSelector as TopicReaderSelector,
)
-from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
+from ._topic_reader.topic_reader_sync import (
+ TopicReaderSync as TopicReader,
+)
from ._topic_reader.topic_reader_asyncio import (
PublicAsyncIOReader as TopicReaderAsyncIO,
@@ -240,7 +245,7 @@ class TopicClientAsyncIO:
def reader(
self,
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
- consumer: str,
+ consumer: Optional[str],
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel
@@ -249,6 +254,7 @@ class TopicClientAsyncIO:
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None,
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
+ event_handler: Optional[TopicReaderEvents.EventHandler] = None,
) -> TopicReaderAsyncIO:
if not decoder_executor:
@@ -257,6 +263,23 @@ class TopicClientAsyncIO:
args = locals().copy()
del args["self"]
+ if consumer == "":
+ raise issues.Error(
+ "Consumer name could not be empty! To use reader without consumer specify consumer as None."
+ )
+
+ if consumer is None:
+ if not isinstance(topic, TopicReaderSelector) or topic.partitions is None:
+ raise issues.Error(
+ "To use reader without consumer it is required to specify partition_ids in topic selector."
+ )
+
+ if event_handler is None:
+ raise issues.Error(
+ "To use reader without consumer it is required to specify event_handler with "
+ "on_partition_get_start_offset method."
+ )
+
settings = TopicReaderSettings(**args)
return TopicReaderAsyncIO(self._driver, settings, _parent=self)
@@ -317,6 +340,21 @@ class TopicClientAsyncIO:
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
+ async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
+ req = _ydb_topic.CommitOffsetRequest(
+ path=path,
+ consumer=consumer,
+ partition_id=partition_id,
+ offset=offset,
+ )
+
+ await self._driver(
+ req.to_proto(),
+ _apis.TopicService.Stub,
+ _apis.TopicService.CommitOffset,
+ _wrap_operation,
+ )
+
def close(self):
if self._closed:
return
@@ -484,7 +522,7 @@ class TopicClient:
def reader(
self,
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
- consumer: str,
+ consumer: Optional[str],
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel
@@ -493,13 +531,30 @@ class TopicClient:
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
+ event_handler: Optional[TopicReaderEvents.EventHandler] = None,
) -> TopicReader:
if not decoder_executor:
decoder_executor = self._executor
args = locals().copy()
del args["self"]
- self._check_closed()
+
+ if consumer == "":
+ raise issues.Error(
+ "Consumer name could not be empty! To use reader without consumer specify consumer as None."
+ )
+
+ if consumer is None:
+ if not isinstance(topic, TopicReaderSelector) or topic.partitions is None:
+ raise issues.Error(
+ "To use reader without consumer it is required to specify partition_ids in topic selector."
+ )
+
+ if event_handler is None:
+ raise issues.Error(
+ "To use reader without consumer it is required to specify event_handler with "
+ "on_partition_get_start_offset method."
+ )
settings = TopicReaderSettings(**args)
@@ -563,6 +618,21 @@ class TopicClient:
return TopicTxWriter(tx, self._driver, settings, _parent=self)
+ def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
+ req = _ydb_topic.CommitOffsetRequest(
+ path=path,
+ consumer=consumer,
+ partition_id=partition_id,
+ offset=offset,
+ )
+
+ self._driver(
+ req.to_proto(),
+ _apis.TopicService.Stub,
+ _apis.TopicService.CommitOffset,
+ _wrap_operation,
+ )
+
def close(self):
if self._closed:
return
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 4a5c580f99f..6b71007009d 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.20.1"
+VERSION = "3.21.0"