diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-21 15:52:54 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-21 16:01:56 +0300 |
commit | 06068c509279029ceaa73fd5f3a0d43e99e16f12 (patch) | |
tree | c36276159e845be170bcc74ec925f4f77bc5ef2e /contrib | |
parent | de494aa10120184ac260727282f2e848b39d7386 (diff) | |
download | ydb-06068c509279029ceaa73fd5f3a0d43e99e16f12.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 7 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py | 9 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/__init__.py | 1 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/_utilities.py | 3 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/__init__.py | 7 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/base.py | 11 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/pool.py | 108 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/session.py | 144 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/transaction.py | 153 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/__init__.py | 7 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/base.py | 264 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/pool.py | 26 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/session.py | 24 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/transaction.py | 108 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/retries.py | 25 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
17 files changed, 602 insertions, 299 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 5b155408fc..b1c2dcdaa2 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.15.0 +Version: 3.16.0 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index cb2a88454a..bc7c34dd4d 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.15.0) +VERSION(3.16.0) LICENSE(Apache-2.0) @@ -57,6 +57,11 @@ PY_SRCS( ydb/aio/iam.py ydb/aio/oauth2_token_exchange.py ydb/aio/pool.py + ydb/aio/query/__init__.py + ydb/aio/query/base.py + ydb/aio/query/pool.py + ydb/aio/query/session.py + ydb/aio/query/transaction.py ydb/aio/resolver.py ydb/aio/scheme.py ydb/aio/table.py diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py index 66ef0a8c85..95a5744313 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py @@ -24,7 +24,8 @@ from google.protobuf.message import Message from google.protobuf.duration_pb2 import Duration as ProtoDuration from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp -import ydb.aio +from ...driver import Driver +from ...aio.driver import Driver as DriverIO try: from ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2 @@ -141,7 +142,7 @@ class IGrpcWrapperAsyncIO(abc.ABC): ... -SupportedDriverType = Union[ydb.Driver, ydb.aio.Driver] +SupportedDriverType = Union[Driver, DriverIO] class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): @@ -180,7 +181,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): if self._wait_executor: self._wait_executor.shutdown(wait) - async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): + async def _start_asyncio_driver(self, driver: DriverIO, stub, method): requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc) stream_call = await driver( requests_iterator, @@ -190,7 +191,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): self._stream_call = stream_call self.from_server_grpc = stream_call.__aiter__() - async def _start_sync_driver(self, driver: ydb.Driver, stub, method): + async def _start_sync_driver(self, driver: Driver, stub, method): requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc) self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) diff --git a/contrib/python/ydb/py3/ydb/aio/__init__.py b/contrib/python/ydb/py3/ydb/aio/__init__.py index acc44db57a..0e7d4e747a 100644 --- a/contrib/python/ydb/py3/ydb/aio/__init__.py +++ b/contrib/python/ydb/py3/ydb/aio/__init__.py @@ -1,2 +1,3 @@ from .driver import Driver # noqa from .table import SessionPool, retry_operation # noqa +from .query import QuerySessionPoolAsync, QuerySessionAsync # noqa diff --git a/contrib/python/ydb/py3/ydb/aio/_utilities.py b/contrib/python/ydb/py3/ydb/aio/_utilities.py index 10cbead667..454378b0d4 100644 --- a/contrib/python/ydb/py3/ydb/aio/_utilities.py +++ b/contrib/python/ydb/py3/ydb/aio/_utilities.py @@ -7,6 +7,9 @@ class AsyncResponseIterator(object): self.it.cancel() return self + def __iter__(self): + return self + def __aiter__(self): return self diff --git a/contrib/python/ydb/py3/ydb/aio/query/__init__.py b/contrib/python/ydb/py3/ydb/aio/query/__init__.py new file mode 100644 index 0000000000..829d7b54cf --- /dev/null +++ b/contrib/python/ydb/py3/ydb/aio/query/__init__.py @@ -0,0 +1,7 @@ +__all__ = [ + "QuerySessionPoolAsync", + "QuerySessionAsync", +] + +from .pool import QuerySessionPoolAsync +from .session import QuerySessionAsync diff --git a/contrib/python/ydb/py3/ydb/aio/query/base.py b/contrib/python/ydb/py3/ydb/aio/query/base.py new file mode 100644 index 0000000000..3800ce3d4f --- /dev/null +++ b/contrib/python/ydb/py3/ydb/aio/query/base.py @@ -0,0 +1,11 @@ +from .. import _utilities + + +class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): + async def __aenter__(self) -> "AsyncResponseContextIterator": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # To close stream on YDB it is necessary to scroll through it to the end + async for _ in self: + pass diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py new file mode 100644 index 0000000000..f91f7465e4 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -0,0 +1,108 @@ +import logging +from typing import ( + Callable, + Optional, + List, +) + +from .session import ( + QuerySessionAsync, +) +from ...retries import ( + RetrySettings, + retry_operation_async, +) +from ... import convert +from ..._grpc.grpcwrapper import common_utils + +logger = logging.getLogger(__name__) + + +class QuerySessionPoolAsync: + """QuerySessionPoolAsync is an object to simplify operations with sessions of Query Service.""" + + def __init__(self, driver: common_utils.SupportedDriverType): + """ + :param driver: A driver instance + """ + + logger.warning("QuerySessionPoolAsync is an experimental API, which could be changed.") + self._driver = driver + + def checkout(self) -> "SimpleQuerySessionCheckoutAsync": + """WARNING: This API is experimental and could be changed. + Return a Session context manager, that opens session on enter and closes session on exit. + """ + + return SimpleQuerySessionCheckoutAsync(self) + + async def retry_operation_async( + self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + ): + """WARNING: This API is experimental and could be changed. + Special interface to execute a bunch of commands with session in a safe, retriable way. + + :param callee: A function, that works with session. + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + async def wrapped_callee(): + async with self.checkout() as session: + return await callee(session, *args, **kwargs) + + return await retry_operation_async(wrapped_callee, retry_settings) + + async def execute_with_retries( + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ) -> List[convert.ResultSet]: + """WARNING: This API is experimental and could be changed. + Special interface to execute a one-shot queries in a safe, retriable way. + Note: this method loads all data from stream before return, do not use this + method with huge read queries. + + :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + async def wrapped_callee(): + async with self.checkout() as session: + it = await session.execute(query, parameters, *args, **kwargs) + return [result_set async for result_set in it] + + return await retry_operation_async(wrapped_callee, retry_settings) + + async def stop(self, timeout=None): + pass # TODO: implement + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.stop() + + +class SimpleQuerySessionCheckoutAsync: + def __init__(self, pool: QuerySessionPoolAsync): + self._pool = pool + self._session = QuerySessionAsync(pool._driver) + + async def __aenter__(self) -> QuerySessionAsync: + await self._session.create() + return self._session + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._session.delete() diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py new file mode 100644 index 0000000000..627a41d895 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -0,0 +1,144 @@ +import asyncio + +from typing import ( + Optional, +) + +from .base import AsyncResponseContextIterator +from .transaction import QueryTxContextAsync +from .. import _utilities +from ... import issues +from ..._grpc.grpcwrapper import common_utils +from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public + +from ...query import base +from ...query.session import ( + BaseQuerySession, + QuerySessionStateEnum, +) + + +class QuerySessionAsync(BaseQuerySession): + """Session object for Query Service. It is not recommended to control + session's lifecycle manually - use a QuerySessionPool is always a better choise. + """ + + _loop: asyncio.AbstractEventLoop + _status_stream: _utilities.AsyncResponseIterator = None + + def __init__( + self, + driver: common_utils.SupportedDriverType, + settings: Optional[base.QueryClientSettings] = None, + loop: asyncio.AbstractEventLoop = None, + ): + super(QuerySessionAsync, self).__init__(driver, settings) + self._loop = loop if loop is not None else asyncio.get_running_loop() + + async def _attach(self) -> None: + self._stream = await self._attach_call() + self._status_stream = _utilities.AsyncResponseIterator( + self._stream, + lambda response: common_utils.ServerStatus.from_proto(response), + ) + + first_response = await self._status_stream.next() + if first_response.status != issues.StatusCode.SUCCESS: + pass + + self._state.set_attached(True) + self._state._change_state(QuerySessionStateEnum.CREATED) + + self._loop.create_task(self._check_session_status_loop(), name="check session status task") + + async def _check_session_status_loop(self) -> None: + try: + async for status in self._status_stream: + if status.status != issues.StatusCode.SUCCESS: + self._state.reset() + self._state._change_state(QuerySessionStateEnum.CLOSED) + except Exception: + if not self._state._already_in(QuerySessionStateEnum.CLOSED): + self._state.reset() + self._state._change_state(QuerySessionStateEnum.CLOSED) + + async def delete(self) -> None: + """WARNING: This API is experimental and could be changed. + + Deletes a Session of Query Service on server side and releases resources. + + :return: None + """ + if self._state._already_in(QuerySessionStateEnum.CLOSED): + return + + self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) + await self._delete_call() + self._stream.cancel() + + async def create(self) -> "QuerySessionAsync": + """WARNING: This API is experimental and could be changed. + + Creates a Session of Query Service on server side and attaches it. + + :return: QuerySessionSync object. + """ + if self._state._already_in(QuerySessionStateEnum.CREATED): + return + + self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) + await self._create_call() + await self._attach() + + return self + + def transaction(self, tx_mode=None) -> QueryTxContextAsync: + self._state._check_session_ready_to_use() + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + + return QueryTxContextAsync( + self._driver, + self._state, + self, + tx_mode, + ) + + async def execute( + self, + query: str, + parameters: dict = None, + syntax: base.QuerySyntax = None, + exec_mode: base.QueryExecMode = None, + concurrent_result_sets: bool = False, + ) -> AsyncResponseContextIterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param parameters: dict with parameters and YDB types; + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + self._state._check_session_ready_to_use() + + stream_it = await self._execute_call( + query=query, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + + return AsyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + settings=self._settings, + ), + ) diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py new file mode 100644 index 0000000000..e9993fccc2 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -0,0 +1,153 @@ +import logging +from typing import ( + Optional, +) + +from .base import AsyncResponseContextIterator +from ... import issues +from ...query import base +from ...query.transaction import ( + BaseQueryTxContext, + QueryTxStateEnum, +) + +logger = logging.getLogger(__name__) + + +class QueryTxContextAsync(BaseQueryTxContext): + async def __aenter__(self) -> "QueryTxContextAsync": + """ + Enters a context manager and returns a transaction + + :return: A transaction instance + """ + return self + + async def __aexit__(self, *args, **kwargs): + """ + Closes a transaction context manager and rollbacks transaction if + it is not finished explicitly + """ + await self._ensure_prev_stream_finished() + if self._tx_state._state == QueryTxStateEnum.BEGINED: + # It's strictly recommended to close transactions directly + # by using commit_tx=True flag while executing statement or by + # .commit() or .rollback() methods, but here we trying to do best + # effort to avoid useless open transactions + logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id) + try: + await self.rollback() + except issues.Error: + logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id) + + async def _ensure_prev_stream_finished(self) -> None: + if self._prev_stream is not None: + async with self._prev_stream: + pass + self._prev_stream = None + + async def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextAsync": + """WARNING: This API is experimental and could be changed. + + Explicitly begins a transaction + + :param settings: A request settings + + :return: None or exception if begin is failed + """ + await self._begin_call(settings) + return self + + async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls commit on a transaction if it is open otherwise is no-op. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: A committed transaction or exception if commit is failed + """ + if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + return + + if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: + self._tx_state._change_state(QueryTxStateEnum.COMMITTED) + return + + await self._ensure_prev_stream_finished() + + await self._commit_call(settings) + + async def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: A committed transaction or exception if commit is failed + """ + if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): + return + + if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: + self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED) + return + + await self._ensure_prev_stream_finished() + + await self._rollback_call(settings) + + async def execute( + self, + query: str, + parameters: Optional[dict] = None, + commit_tx: Optional[bool] = False, + syntax: Optional[base.QuerySyntax] = None, + exec_mode: Optional[base.QueryExecMode] = None, + concurrent_result_sets: Optional[bool] = False, + settings: Optional[base.QueryClientSettings] = None, + ) -> AsyncResponseContextIterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param parameters: dict with parameters and YDB types; + :param commit_tx: A special flag that allows transaction commit. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param exec_mode: Exec mode of the query, which is a one from the following choises: + 1) QueryExecMode.EXECUTE, which is default; + 2) QueryExecMode.EXPLAIN; + 3) QueryExecMode.VALIDATE; + 4) QueryExecMode.PARSE. + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + await self._ensure_prev_stream_finished() + + stream_it = await self._execute_call( + query=query, + commit_tx=commit_tx, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + + settings = settings if settings is not None else self.session._settings + self._prev_stream = AsyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + tx=self, + commit_tx=commit_tx, + settings=settings, + ), + ) + return self._prev_stream diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py index eb967abc20..40e512cd6b 100644 --- a/contrib/python/ydb/py3/ydb/query/__init__.py +++ b/contrib/python/ydb/py3/ydb/query/__init__.py @@ -11,13 +11,12 @@ __all__ = [ import logging from .base import ( - IQueryClient, - SupportedDriverType, QueryClientSettings, ) from .session import QuerySessionSync +from .._grpc.grpcwrapper import common_utils from .._grpc.grpcwrapper.ydb_query_public_types import ( QueryOnlineReadOnly, QuerySerializableReadWrite, @@ -30,8 +29,8 @@ from .pool import QuerySessionPool logger = logging.getLogger(__name__) -class QueryClientSync(IQueryClient): - def __init__(self, driver: SupportedDriverType, query_client_settings: QueryClientSettings = None): +class QueryClientSync: + def __init__(self, driver: common_utils.SupportedDriverType, query_client_settings: QueryClientSettings = None): logger.warning("QueryClientSync is an experimental API, which could be changed.") self._driver = driver self._settings = query_client_settings diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index e08d9f52d0..55087d0c4e 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -2,14 +2,11 @@ import abc import enum import functools +import typing from typing import ( - Iterator, Optional, ) -from .._grpc.grpcwrapper.common_utils import ( - SupportedDriverType, -) from .._grpc.grpcwrapper import ydb_query from .._grpc.grpcwrapper.ydb_query_public_types import ( BaseQueryTxMode, @@ -20,6 +17,9 @@ from .. import issues from .. import _utilities from .. import _apis +if typing.TYPE_CHECKING: + from .transaction import BaseQueryTxContext + class QuerySyntax(enum.IntEnum): UNSPECIFIED = 0 @@ -48,12 +48,38 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator): return self def __exit__(self, exc_type, exc_val, exc_tb): + # To close stream on YDB it is necessary to scroll through it to the end for _ in self: pass class QueryClientSettings: - pass + def __init__(self): + self._native_datetime_in_result_sets = True + self._native_date_in_result_sets = True + self._native_json_in_result_sets = True + self._native_interval_in_result_sets = True + self._native_timestamp_in_result_sets = True + + def with_native_timestamp_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_timestamp_in_result_sets = enabled + return self + + def with_native_interval_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_interval_in_result_sets = enabled + return self + + def with_native_json_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_json_in_result_sets = enabled + return self + + def with_native_date_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_date_in_result_sets = enabled + return self + + def with_native_datetime_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_datetime_in_result_sets = enabled + return self class IQuerySessionState(abc.ABC): @@ -92,229 +118,6 @@ class IQuerySessionState(abc.ABC): pass -class IQuerySession(abc.ABC): - """Session object for Query Service. It is not recommended to control - session's lifecycle manually - use a QuerySessionPool is always a better choise. - """ - - @abc.abstractmethod - def __init__(self, driver: SupportedDriverType, settings: Optional[QueryClientSettings] = None): - pass - - @abc.abstractmethod - def create(self) -> "IQuerySession": - """WARNING: This API is experimental and could be changed. - - Creates a Session of Query Service on server side and attaches it. - - :return: Session object. - """ - pass - - @abc.abstractmethod - def delete(self) -> None: - """WARNING: This API is experimental and could be changed. - - Deletes a Session of Query Service on server side and releases resources. - - :return: None - """ - pass - - @abc.abstractmethod - def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxContext": - """WARNING: This API is experimental and could be changed. - - Creates a transaction context manager with specified transaction mode. - - :param tx_mode: Transaction mode, which is a one from the following choises: - 1) QuerySerializableReadWrite() which is default mode; - 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); - 3) QuerySnapshotReadOnly(); - 4) QueryStaleReadOnly(). - - :return: transaction context manager. - """ - pass - - @abc.abstractmethod - def execute( - self, - query: str, - syntax: Optional[QuerySyntax] = None, - exec_mode: Optional[QueryExecMode] = None, - parameters: Optional[dict] = None, - concurrent_result_sets: Optional[bool] = False, - ) -> Iterator: - """WARNING: This API is experimental and could be changed. - - Sends a query to Query Service - :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: - 1) QuerySyntax.YQL_V1, which is default; - 2) QuerySyntax.PG. - :param parameters: dict with parameters and YDB types; - :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; - - :return: Iterator with result sets - """ - - -class IQueryTxContext(abc.ABC): - """ - An object that provides a simple transaction context manager that allows statements execution - in a transaction. You don't have to open transaction explicitly, because context manager encapsulates - transaction control logic, and opens new transaction if: - 1) By explicit .begin(); - 2) On execution of a first statement, which is strictly recommended method, because that avoids - useless round trip - - This context manager is not thread-safe, so you should not manipulate on it concurrently. - """ - - @abc.abstractmethod - def __init__( - self, - driver: SupportedDriverType, - session_state: IQuerySessionState, - session: IQuerySession, - tx_mode: BaseQueryTxMode, - ): - """ - An object that provides a simple transaction context manager that allows statements execution - in a transaction. You don't have to open transaction explicitly, because context manager encapsulates - transaction control logic, and opens new transaction if: - - 1) By explicit .begin() method; - 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip - - This context manager is not thread-safe, so you should not manipulate on it concurrently. - - :param driver: A driver instance - :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: - 1) QuerySerializableReadWrite() which is default mode; - 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); - 3) QuerySnapshotReadOnly(); - 4) QueryStaleReadOnly(). - """ - pass - - @abc.abstractmethod - def __enter__(self) -> "IQueryTxContext": - """ - Enters a context manager and returns a transaction - - :return: A transaction instance - """ - pass - - @abc.abstractmethod - def __exit__(self, *args, **kwargs): - """ - Closes a transaction context manager and rollbacks transaction if - it is not finished explicitly - """ - pass - - @property - @abc.abstractmethod - def session_id(self) -> str: - """ - A transaction's session id - - :return: A transaction's session id - """ - pass - - @property - @abc.abstractmethod - def tx_id(self) -> Optional[str]: - """ - Returns an id of open transaction or None otherwise - - :return: An id of open transaction or None otherwise - """ - pass - - @abc.abstractmethod - def begin(self, settings: Optional[QueryClientSettings] = None) -> None: - """WARNING: This API is experimental and could be changed. - - Explicitly begins a transaction - - :param settings: A request settings - - :return: None or exception if begin is failed - """ - pass - - @abc.abstractmethod - def commit(self, settings: Optional[QueryClientSettings] = None) -> None: - """WARNING: This API is experimental and could be changed. - - Calls commit on a transaction if it is open. If transaction execution - failed then this method raises PreconditionFailed. - - :param settings: A request settings - - :return: None or exception if commit is failed - """ - pass - - @abc.abstractmethod - def rollback(self, settings: Optional[QueryClientSettings] = None) -> None: - """WARNING: This API is experimental and could be changed. - - Calls rollback on a transaction if it is open. If transaction execution - failed then this method raises PreconditionFailed. - - :param settings: A request settings - - :return: None or exception if rollback is failed - """ - pass - - @abc.abstractmethod - def execute( - self, - query: str, - commit_tx: Optional[bool] = False, - syntax: Optional[QuerySyntax] = None, - exec_mode: Optional[QueryExecMode] = None, - parameters: Optional[dict] = None, - concurrent_result_sets: Optional[bool] = False, - ) -> Iterator: - """WARNING: This API is experimental and could be changed. - - Sends a query to Query Service - :param query: (YQL or SQL text) to be executed. - :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: - 1) QuerySyntax.YQL_V1, which is default; - 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: - 1) QueryExecMode.EXECUTE, which is default; - 2) QueryExecMode.EXPLAIN; - 3) QueryExecMode.VALIDATE; - 4) QueryExecMode.PARSE. - :param parameters: dict with parameters and YDB types; - :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; - - :return: Iterator with result sets - """ - pass - - -class IQueryClient(abc.ABC): - def __init__(self, driver: SupportedDriverType, query_client_settings: Optional[QueryClientSettings] = None): - pass - - @abc.abstractmethod - def session(self) -> IQuerySession: - pass - - def create_execute_query_request( query: str, session_id: str, @@ -365,15 +168,16 @@ def create_execute_query_request( def wrap_execute_query_response( rpc_state: RpcState, response_pb: _apis.ydb_query.ExecuteQueryResponsePart, - tx: Optional[IQueryTxContext] = None, + tx: Optional["BaseQueryTxContext"] = None, commit_tx: Optional[bool] = False, + settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: issues._process_response(response_pb) if tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) if tx and commit_tx: tx._move_to_commited() - return convert.ResultSet.from_message(response_pb.result_set) + return convert.ResultSet.from_message(response_pb.result_set, settings) def bad_session_handler(func): diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index e7514cdf76..afe39f0623 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -5,7 +5,6 @@ from typing import ( List, ) -from . import base from .session import ( QuerySessionSync, ) @@ -14,6 +13,8 @@ from ..retries import ( retry_operation_sync, ) from .. import convert +from .._grpc.grpcwrapper import common_utils + logger = logging.getLogger(__name__) @@ -21,7 +22,7 @@ logger = logging.getLogger(__name__) class QuerySessionPool: """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" - def __init__(self, driver: base.SupportedDriverType): + def __init__(self, driver: common_utils.SupportedDriverType): """ :param driver: A driver instance """ @@ -55,7 +56,12 @@ class QuerySessionPool: return retry_operation_sync(wrapped_callee, retry_settings) def execute_with_retries( - self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. Special interface to execute a one-shot queries in a safe, retriable way. @@ -63,6 +69,7 @@ class QuerySessionPool: method with huge read queries. :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors. @@ -72,18 +79,27 @@ class QuerySessionPool: def wrapped_callee(): with self.checkout() as session: - it = session.execute(query, *args, **kwargs) + it = session.execute(query, parameters, *args, **kwargs) return [result_set for result_set in it] return retry_operation_sync(wrapped_callee, retry_settings) + def stop(self, timeout=None): + pass # TODO: implement + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + class SimpleQuerySessionCheckout: def __init__(self, pool: QuerySessionPool): self._pool = pool self._session = QuerySessionSync(pool._driver) - def __enter__(self) -> base.IQuerySession: + def __enter__(self) -> QuerySessionSync: self._session.create() return self._session diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index d6034d348a..4b051dc16f 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -15,7 +15,7 @@ from .._grpc.grpcwrapper import common_utils from .._grpc.grpcwrapper import ydb_query as _ydb_query from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public -from .transaction import BaseQueryTxContext +from .transaction import QueryTxContextSync logger = logging.getLogger(__name__) @@ -126,12 +126,12 @@ def wrapper_delete_session( return session -class BaseQuerySession(base.IQuerySession): - _driver: base.SupportedDriverType +class BaseQuerySession: + _driver: common_utils.SupportedDriverType _settings: base.QueryClientSettings _state: QuerySessionState - def __init__(self, driver: base.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None): + def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None): self._driver = driver self._settings = settings if settings is not None else base.QueryClientSettings() self._state = QuerySessionState(settings) @@ -224,7 +224,9 @@ class QuerySessionSync(BaseQuerySession): self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED) except Exception: - pass + if not self._state._already_in(QuerySessionStateEnum.CLOSED): + self._state.reset() + self._state._change_state(QuerySessionStateEnum.CLOSED) def delete(self) -> None: """WARNING: This API is experimental and could be changed. @@ -256,7 +258,7 @@ class QuerySessionSync(BaseQuerySession): return self - def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQueryTxContext: + def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContextSync: """WARNING: This API is experimental and could be changed. Creates a transaction context manager with specified transaction mode. @@ -273,7 +275,7 @@ class QuerySessionSync(BaseQuerySession): tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() - return BaseQueryTxContext( + return QueryTxContextSync( self._driver, self._state, self, @@ -283,9 +285,9 @@ class QuerySessionSync(BaseQuerySession): def execute( self, query: str, + parameters: dict = None, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, concurrent_result_sets: bool = False, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -313,5 +315,9 @@ class QuerySessionSync(BaseQuerySession): return base.SyncResponseContextIterator( stream_it, - lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp), + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + settings=self._settings, + ), ) diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index 0a49320293..750a94b0b5 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -169,7 +169,7 @@ def wrap_tx_rollback_response( return tx -class BaseQueryTxContext(base.IQueryTxContext): +class BaseQueryTxContext: def __init__(self, driver, session_state, session, tx_mode): """ An object that provides a simple transaction context manager that allows statements execution @@ -196,31 +196,6 @@ class BaseQueryTxContext(base.IQueryTxContext): self.session = session self._prev_stream = None - def __enter__(self) -> "BaseQueryTxContext": - """ - Enters a context manager and returns a transaction - - :return: A transaction instance - """ - return self - - def __exit__(self, *args, **kwargs): - """ - Closes a transaction context manager and rollbacks transaction if - it is not finished explicitly - """ - self._ensure_prev_stream_finished() - if self._tx_state._state == QueryTxStateEnum.BEGINED: - # It's strictly recommended to close transactions directly - # by using commit_tx=True flag while executing statement or by - # .commit() or .rollback() methods, but here we trying to do best - # effort to avoid useless open transactions - logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id) - try: - self.rollback() - except issues.Error: - logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id) - @property def session_id(self) -> str: """ @@ -240,6 +215,8 @@ class BaseQueryTxContext(base.IQueryTxContext): return self._tx_state.tx_id def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) + return self._driver( _create_begin_transaction_request(self._session_state, self._tx_state), _apis.QueryService.Stub, @@ -250,6 +227,8 @@ class BaseQueryTxContext(base.IQueryTxContext): ) def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) + return self._driver( _create_commit_transaction_request(self._session_state, self._tx_state), _apis.QueryService.Stub, @@ -260,6 +239,8 @@ class BaseQueryTxContext(base.IQueryTxContext): ) def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) + return self._driver( _create_rollback_transaction_request(self._session_state, self._tx_state), _apis.QueryService.Stub, @@ -278,6 +259,8 @@ class BaseQueryTxContext(base.IQueryTxContext): parameters: dict = None, concurrent_result_sets: bool = False, ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + self._tx_state._check_tx_ready_to_use() + request = base.create_execute_query_request( query=query, session_id=self._session_state.session_id, @@ -296,12 +279,6 @@ class BaseQueryTxContext(base.IQueryTxContext): _apis.QueryService.ExecuteQuery, ) - def _ensure_prev_stream_finished(self) -> None: - if self._prev_stream is not None: - for _ in self._prev_stream: - pass - self._prev_stream = None - def _move_to_beginned(self, tx_id: str) -> None: if self._tx_state._already_in(QueryTxStateEnum.BEGINED): return @@ -313,19 +290,52 @@ class BaseQueryTxContext(base.IQueryTxContext): return self._tx_state._change_state(QueryTxStateEnum.COMMITTED) - def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None: + +class QueryTxContextSync(BaseQueryTxContext): + def __enter__(self) -> "BaseQueryTxContext": + """ + Enters a context manager and returns a transaction + + :return: A transaction instance + """ + return self + + def __exit__(self, *args, **kwargs): + """ + Closes a transaction context manager and rollbacks transaction if + it is not finished explicitly + """ + self._ensure_prev_stream_finished() + if self._tx_state._state == QueryTxStateEnum.BEGINED: + # It's strictly recommended to close transactions directly + # by using commit_tx=True flag while executing statement or by + # .commit() or .rollback() methods, but here we trying to do best + # effort to avoid useless open transactions + logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id) + try: + self.rollback() + except issues.Error: + logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id) + + def _ensure_prev_stream_finished(self) -> None: + if self._prev_stream is not None: + with self._prev_stream: + pass + self._prev_stream = None + + def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextSync": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction :param settings: A request settings - :return: None or exception if begin is failed + :return: Transaction object or exception if begin is failed """ - self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) - self._begin_call(settings) + return self + def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: """WARNING: This API is experimental and could be changed. @@ -338,43 +348,51 @@ class BaseQueryTxContext(base.IQueryTxContext): """ if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): return - self._ensure_prev_stream_finished() if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: self._tx_state._change_state(QueryTxStateEnum.COMMITTED) return - self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) + self._ensure_prev_stream_finished() self._commit_call(settings) def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: A committed transaction or exception if commit is failed + """ if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): return - self._ensure_prev_stream_finished() - if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED) return - self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) + self._ensure_prev_stream_finished() self._rollback_call(settings) def execute( self, query: str, + parameters: Optional[dict] = None, commit_tx: Optional[bool] = False, syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, - parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, + settings: Optional[base.QueryClientSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. Sends a query to Query Service :param query: (YQL or SQL text) to be executed. + :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; @@ -384,13 +402,12 @@ class BaseQueryTxContext(base.IQueryTxContext): 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. - :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param settings: An additional request settings QueryClientSettings; :return: Iterator with result sets """ self._ensure_prev_stream_finished() - self._tx_state._check_tx_ready_to_use() stream_it = self._execute_call( query=query, @@ -400,6 +417,8 @@ class BaseQueryTxContext(base.IQueryTxContext): parameters=parameters, concurrent_result_sets=concurrent_result_sets, ) + + settings = settings if settings is not None else self.session._settings self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -407,6 +426,7 @@ class BaseQueryTxContext(base.IQueryTxContext): response_pb=resp, tx=self, commit_tx=commit_tx, + settings=settings, ), ) return self._prev_stream diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py index 5d4f6e6a0f..c9c23b1a91 100644 --- a/contrib/python/ydb/py3/ydb/retries.py +++ b/contrib/python/ydb/py3/ydb/retries.py @@ -1,3 +1,4 @@ +import asyncio import random import time @@ -134,3 +135,27 @@ def retry_operation_sync(callee, retry_settings=None, *args, **kwargs): time.sleep(next_opt.timeout) else: return next_opt.result + + +async def retry_operation_async(callee, retry_settings=None, *args, **kwargs): # pylint: disable=W1113 + """ + The retry operation helper can be used to retry a coroutine that raises YDB specific + exceptions. + + :param callee: A coroutine to retry. + :param retry_settings: An instance of ydb.RetrySettings that describes how the coroutine + should be retried. If None, default instance of retry settings will be used. + :param args: A tuple with positional arguments to be passed into the coroutine. + :param kwargs: A dictionary with keyword arguments to be passed into the coroutine. + + Returns awaitable result of coroutine. If retries are not succussful exception is raised. + """ + opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) + for next_opt in opt_generator: + if isinstance(next_opt, YdbRetryOperationSleepOpt): + await asyncio.sleep(next_opt.timeout) + else: + try: + return await next_opt.result + except BaseException as e: # pylint: disable=W0703 + next_opt.set_exception(e) diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 567cda12c7..76caab7b48 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.15.0" +VERSION = "3.16.0" |