diff options
author | robot-piglet <[email protected]> | 2025-04-25 10:39:58 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-04-25 11:07:55 +0300 |
commit | 7f0039151eb08c3d9e241b29eb43889da16b0162 (patch) | |
tree | 7a7185b8496d3bef4cc8134ec869b1c163b30696 /contrib/python | |
parent | a64165ec06c774a4d2815a7ccf8c7fd66921a2c1 (diff) |
Intermediate changes
commit_hash:9feae42eea92837e800e65cbcf9acc15f580e438
Diffstat (limited to 'contrib/python')
20 files changed, 318 insertions, 59 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 904414722ef..8a15b64dc4b 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.20.1 +Version: 3.21.0 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index fbc5d148f8a..b938db3247b 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.20.1) +VERSION(3.21.0) LICENSE(Apache-2.0) @@ -40,6 +40,7 @@ PY_SRCS( ydb/_topic_common/common.py ydb/_topic_reader/__init__.py ydb/_topic_reader/datatypes.py + ydb/_topic_reader/events.py ydb/_topic_reader/topic_reader.py ydb/_topic_reader/topic_reader_asyncio.py ydb/_topic_reader/topic_reader_sync.py diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py index fc6f16e287c..b0fa8f3cb22 100644 --- a/contrib/python/ydb/py3/ydb/_apis.py +++ b/contrib/python/ydb/py3/ydb/_apis.py @@ -116,6 +116,7 @@ class TopicService(object): StreamRead = "StreamRead" StreamWrite = "StreamWrite" UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction" + CommitOffset = "CommitOffset" class QueryService(object): diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 0f8a0f03a7a..6b3594b8da4 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -136,6 +136,22 @@ class UpdateTokenResponse(IFromProto): return UpdateTokenResponse() +@dataclass +class CommitOffsetRequest(IToProto): + path: str + consumer: str + partition_id: int + offset: int + + def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest: + return ydb_topic_pb2.CommitOffsetRequest( + path=self.path, + consumer=self.consumer, + partition_id=self.partition_id, + offset=self.offset, + ) + + ######################################################################################################################## # StreamWrite ######################################################################################################################## @@ -438,12 +454,13 @@ class StreamReadMessage: @dataclass class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] - consumer: str + consumer: Optional[str] auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() - res.consumer = self.consumer + if self.consumer is not None: + res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) res.auto_partitioning_support = self.auto_partitioning_support diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index 74f06a086fc..737fa4149e2 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -56,6 +56,10 @@ class PublicMessage(ICommittable, ISessionAlive): def alive(self) -> bool: return not self._partition_session.closed + @property + def partition_id(self) -> int: + return self._partition_session.partition_id + @dataclass class PartitionSession: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/events.py b/contrib/python/ydb/py3/ydb/_topic_reader/events.py new file mode 100644 index 00000000000..b229713c957 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/_topic_reader/events.py @@ -0,0 +1,81 @@ +import asyncio +from dataclasses import dataclass +from typing import Awaitable, Union + +from ..issues import ClientInternalError + +__all__ = [ + "OnCommit", + "OnPartitionGetStartOffsetRequest", + "OnPartitionGetStartOffsetResponse", + "OnInitPartition", + "OnShutdownPartition", + "EventHandler", +] + + +class BaseReaderEvent: + pass + + +@dataclass +class OnCommit(BaseReaderEvent): + topic: str + offset: int + + +@dataclass +class OnPartitionGetStartOffsetRequest(BaseReaderEvent): + topic: str + partition_id: int + + +@dataclass +class OnPartitionGetStartOffsetResponse: + start_offset: int + + +class OnInitPartition(BaseReaderEvent): + pass + + +class OnShutdownPartition: + pass + + +TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None] + + +class EventHandler: + def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]: + pass + + def on_partition_get_start_offset( + self, + event: OnPartitionGetStartOffsetRequest, + ) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]: + pass + + def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]: + pass + + def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]: + pass + + async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]: + f = None + if isinstance(event, OnCommit): + f = self.on_commit + elif isinstance(event, OnPartitionGetStartOffsetRequest): + f = self.on_partition_get_start_offset + elif isinstance(event, OnInitPartition): + f = self.on_init_partition + elif isinstance(event, OnShutdownPartition): + f = self.on_shutdown_partition + else: + raise ClientInternalError("Unsupported topic reader event") + + if asyncio.iscoroutinefunction(f): + return await f(event) + + return f(event) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index 8bc12cc0d87..d477c9ca1bb 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -10,6 +10,7 @@ from typing import ( Callable, ) +from .events import EventHandler from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange @@ -20,6 +21,7 @@ class PublicTopicSelector: partitions: Optional[Union[int, List[int]]] = None read_from: Optional[datetime.datetime] = None max_lag: Optional[datetime.timedelta] = None + read_offset: Optional[int] = None def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings: partitions = self.partitions @@ -42,7 +44,7 @@ TopicSelectorTypes = Union[str, PublicTopicSelector, List[Union[str, PublicTopic @dataclass class PublicReaderSettings: - consumer: str + consumer: Optional[str] topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 auto_partitioning_support: bool = True @@ -53,13 +55,14 @@ class PublicReaderSettings: # decoder_executor, must be set for handle non raw messages decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 + event_handler: Optional[EventHandler] = None def __post_init__(self): # check possible create init message _ = self._init_message() def _init_message(self) -> StreamReadMessage.InitRequest: - if not isinstance(self.consumer, str): + if self.consumer is not None and not isinstance(self.consumer, str): raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer)) if isinstance(self.topic, list): @@ -85,25 +88,6 @@ class PublicReaderSettings: return RetrySettings(idempotent=True) -class Events: - class OnCommit: - topic: str - offset: int - - class OnPartitionGetStartOffsetRequest: - topic: str - partition_id: int - - class OnPartitionGetStartOffsetResponse: - start_offset: int - - class OnInitPartition: - pass - - class OnShutdownPatition: - pass - - class RetryPolicy: connection_timeout_sec: float overload_timeout_sec: float diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 87012554ef5..34c52108e17 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -15,6 +15,7 @@ from .._utilities import AtomicCounter from ..aio import Driver from ..issues import Error as YdbError, _process_response from . import datatypes +from . import events from . import topic_reader from .._grpc.grpcwrapper.common_utils import ( IGrpcWrapperAsyncIO, @@ -72,6 +73,7 @@ class TopicReaderClosedError(TopicReaderError): class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool + _settings: topic_reader.PublicReaderSettings _reconnector: ReaderReconnector _parent: typing.Any # need for prevent close parent client by GC @@ -84,6 +86,7 @@ class PublicAsyncIOReader: ): self._loop = asyncio.get_running_loop() self._closed = False + self._settings = settings self._reconnector = ReaderReconnector(driver, settings, self._loop) self._parent = _parent @@ -156,6 +159,9 @@ class PublicAsyncIOReader: For the method no way check the commit result (for example if lost connection - commits will not re-send and committed messages will receive again). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + try: self._reconnector.commit(batch) except PublicTopicReaderPartitionExpiredError: @@ -171,6 +177,9 @@ class PublicAsyncIOReader: before receive commit ack. Message may be acked or not (if not - it will send in other read session, to this or other reader). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + waiter = self._reconnector.commit(batch) await waiter.future @@ -393,6 +402,7 @@ class ReaderStream: _update_token_interval: Union[int, float] _update_token_event: asyncio.Event _get_token_function: Callable[[], str] + _settings: topic_reader.PublicReaderSettings def __init__( self, @@ -425,6 +435,8 @@ class ReaderStream: self._get_token_function = get_token_function self._update_token_event = asyncio.Event() + self._settings = settings + @staticmethod async def create( reader_reconnector_id: int, @@ -615,7 +627,7 @@ class ReaderStream: message.server_message, StreamReadMessage.StartPartitionSessionRequest, ): - self._on_start_partition_session(message.server_message) + await self._on_start_partition_session(message.server_message) elif isinstance( message.server_message, @@ -660,7 +672,7 @@ class ReaderStream: finally: self._update_token_event.clear() - def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): + async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): try: if message.partition_session.partition_session_id in self._partition_sessions: raise TopicReaderError( @@ -676,11 +688,23 @@ class ReaderStream: reader_reconnector_id=self._reader_reconnector_id, reader_stream_id=self._id, ) + + read_offset = None + + if self._settings.event_handler is not None: + resp = await self._settings.event_handler._dispatch( + events.OnPartitionGetStartOffsetRequest( + message.partition_session.path, + message.partition_session.partition_id, + ) + ) + read_offset = None if resp is None else resp.start_offset + self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.StartPartitionSessionResponse( partition_session_id=message.partition_session.partition_session_id, - read_offset=None, + read_offset=read_offset, commit_offset=None, ) ), diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py index 31f28899271..bb2fc2a34b0 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py @@ -4,6 +4,7 @@ import logging import typing from typing import List, Union, Optional +from ydb import issues from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType from ydb._topic_common.common import ( _get_shared_event_loop, @@ -31,6 +32,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _settings: PublicReaderSettings _parent: typing.Any # need for prevent stop the client by GC def __init__( @@ -55,6 +57,8 @@ class TopicReaderSync: self._async_reader = asyncio.run_coroutine_threadsafe(create_reader(), loop).result() + self._settings = settings + self._parent = _parent def __del__(self): @@ -154,6 +158,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( @@ -168,6 +175,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout) def async_commit_with_ack( @@ -178,6 +188,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess)) def close(self, *, flush: bool = True, timeout: TimeoutType = None): diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index f1ca68d1cf0..b691a1b111c 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -142,7 +142,7 @@ class QuerySessionPool: """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py index 0561de8c391..fe857878a54 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/session.py +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -117,15 +117,22 @@ class QuerySession(BaseQuerySession): exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -133,10 +140,11 @@ class QuerySession(BaseQuerySession): stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -147,6 +155,7 @@ class QuerySession(BaseQuerySession): rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py index f0547e5f01f..c9a6e445c93 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -29,7 +29,7 @@ class QueryTxContext(BaseQueryTxContext): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -142,21 +142,28 @@ class QueryTxContext(BaseQueryTxContext): exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -164,10 +171,11 @@ class QueryTxContext(BaseQueryTxContext): stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) diff --git a/contrib/python/ydb/py3/ydb/iam/auth.py b/contrib/python/ydb/py3/ydb/iam/auth.py index c4096c096bf..688dededfca 100644 --- a/contrib/python/ydb/py3/ydb/iam/auth.py +++ b/contrib/python/ydb/py3/ydb/iam/auth.py @@ -3,7 +3,7 @@ from ydb import credentials, tracing import grpc import time import abc -from datetime import datetime +from datetime import datetime, timezone import json import os @@ -43,8 +43,8 @@ YANDEX_CLOUD_JWT_ALGORITHM = "PS256" def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout, algorithm, token_service_url, subject=None): assert jwt is not None, "Install pyjwt library to use jwt tokens" now = time.time() - now_utc = datetime.utcfromtimestamp(now) - exp_utc = datetime.utcfromtimestamp(now + jwt_expiration_timeout) + now_utc = datetime.fromtimestamp(now, timezone.utc) + exp_utc = datetime.fromtimestamp(now + jwt_expiration_timeout, timezone.utc) payload = { "iss": account_id, "aud": token_service_url, diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py index 59dd799294b..76436f98353 100644 --- a/contrib/python/ydb/py3/ydb/query/__init__.py +++ b/contrib/python/ydb/py3/ydb/query/__init__.py @@ -7,6 +7,7 @@ __all__ = [ "QuerySessionPool", "QueryClientSettings", "QuerySession", + "QueryStatsMode", "QueryTxContext", ] @@ -14,6 +15,7 @@ import logging from .base import ( QueryClientSettings, + QueryStatsMode, ) from .session import QuerySession diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index a5ebedd95b3..52a6312e3be 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -25,6 +25,7 @@ from ydb._grpc.grpcwrapper.common_utils import to_thread if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext + from .session import BaseQuerySession class QuerySyntax(enum.IntEnum): @@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum): EXECUTE = 50 -class StatsMode(enum.IntEnum): +class QueryStatsMode(enum.IntEnum): UNSPECIFIED = 0 NONE = 10 BASIC = 20 @@ -132,12 +133,13 @@ def create_execute_query_request( tx_mode: Optional[BaseQueryTxMode], syntax: Optional[QuerySyntax], exec_mode: Optional[QueryExecMode], + stats_mode: Optional[QueryStatsMode], parameters: Optional[dict], concurrent_result_sets: Optional[bool], ) -> ydb_query.ExecuteQueryRequest: syntax = QuerySyntax.YQL_V1 if not syntax else syntax exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode - stats_mode = StatsMode.NONE # TODO: choise is not supported yet + stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode tx_control = None if not tx_id and not tx_mode: @@ -189,6 +191,7 @@ def wrap_execute_query_response( response_pb: _apis.ydb_query.ExecuteQueryResponsePart, session_state: IQuerySessionState, tx: Optional["BaseQueryTxContext"] = None, + session: Optional["BaseQuerySession"] = None, commit_tx: Optional[bool] = False, settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: @@ -198,6 +201,12 @@ def wrap_execute_query_response( elif tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) + if response_pb.HasField("exec_stats"): + if tx is not None: + tx._last_query_stats = response_pb.exec_stats + if session is not None: + session._last_query_stats = response_pb.exec_stats + if response_pb.HasField("result_set"): return convert.ResultSet.from_message(response_pb.result_set, settings) diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index b25f7db855c..1cf95ac0d13 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -151,7 +151,7 @@ class QuerySessionPool: """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index 382c922d5e6..3cc6c13d4e9 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -147,6 +147,12 @@ class BaseQuerySession: .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT) ) + self._last_query_stats = None + + @property + def last_query_stats(self): + return self._last_query_stats + def _get_client_settings( self, driver: common_utils.SupportedDriverType, @@ -189,22 +195,26 @@ class BaseQuerySession: def _execute_call( self, query: str, + parameters: dict = None, commit_tx: bool = False, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, + stats_mode: Optional[base.QueryStatsMode] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._state.session_id, tx_mode=None, tx_id=None, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -293,7 +303,7 @@ class QuerySession(BaseQuerySession): def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext: """Creates a transaction context manager with specified transaction mode. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -321,15 +331,22 @@ class QuerySession(BaseQuerySession): exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -337,10 +354,11 @@ class QuerySession(BaseQuerySession): stream_it = self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -351,6 +369,7 @@ class QuerySession(BaseQuerySession): rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index ae7642dbe21..008ac7c404f 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -197,7 +197,7 @@ class BaseQueryTxContext(base.CallbackHandler): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -210,6 +210,7 @@ class BaseQueryTxContext(base.CallbackHandler): self.session = session self._prev_stream = None self._external_error = None + self._last_query_stats = None @property def session_id(self) -> str: @@ -229,6 +230,10 @@ class BaseQueryTxContext(base.CallbackHandler): """ return self._tx_state.tx_id + @property + def last_query_stats(self): + return self._last_query_stats + def _tx_identity(self) -> _ydb_topic.TransactionIdentity: if not self.tx_id: raise RuntimeError("Unable to get tx identity without started tx.") @@ -283,25 +288,29 @@ class BaseQueryTxContext(base.CallbackHandler): def _execute_call( self, query: str, + parameters: Optional[dict], commit_tx: Optional[bool], syntax: Optional[base.QuerySyntax], exec_mode: Optional[base.QueryExecMode], - parameters: Optional[dict], + stats_mode: Optional[base.QueryStatsMode], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() self._check_external_error_set() + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._session_state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._session_state.session_id, tx_id=self._tx_state.tx_id, tx_mode=self._tx_state.tx_mode, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -338,7 +347,7 @@ class QueryTxContext(BaseQueryTxContext): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -451,22 +460,29 @@ class QueryTxContext(BaseQueryTxContext): exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :param settings: An additional request settings BaseRequestSettings; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -477,6 +493,7 @@ class QueryTxContext(BaseQueryTxContext): commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, + stats_mode=stats_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings, diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index a501f9d2750..ceb82efb5dc 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -16,6 +16,7 @@ __all__ = [ "TopicReader", "TopicReaderAsyncIO", "TopicReaderBatch", + "TopicReaderEvents", "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", @@ -42,6 +43,8 @@ from . import aio, Credentials, _apis, issues from . import driver +from ._topic_reader import events as TopicReaderEvents + from ._topic_reader.datatypes import ( PublicBatch as TopicReaderBatch, PublicMessage as TopicReaderMessage, @@ -52,7 +55,9 @@ from ._topic_reader.topic_reader import ( PublicTopicSelector as TopicReaderSelector, ) -from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader +from ._topic_reader.topic_reader_sync import ( + TopicReaderSync as TopicReader, +) from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, @@ -240,7 +245,7 @@ class TopicClientAsyncIO: def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -249,6 +254,7 @@ class TopicClientAsyncIO: # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -257,6 +263,23 @@ class TopicClientAsyncIO: args = locals().copy() del args["self"] + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) + + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) + + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) + settings = TopicReaderSettings(**args) return TopicReaderAsyncIO(self._driver, settings, _parent=self) @@ -317,6 +340,21 @@ class TopicClientAsyncIO: return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self) + async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + await self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return @@ -484,7 +522,7 @@ class TopicClient: def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -493,13 +531,30 @@ class TopicClient: # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor args = locals().copy() del args["self"] - self._check_closed() + + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) + + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) + + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) settings = TopicReaderSettings(**args) @@ -563,6 +618,21 @@ class TopicClient: return TopicTxWriter(tx, self._driver, settings, _parent=self) + def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 4a5c580f99f..6b71007009d 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.20.1" +VERSION = "3.21.0" |