aboutsummaryrefslogtreecommitdiffstats
path: root/contrib
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-08-21 15:52:54 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-08-21 16:01:56 +0300
commit06068c509279029ceaa73fd5f3a0d43e99e16f12 (patch)
treec36276159e845be170bcc74ec925f4f77bc5ef2e /contrib
parentde494aa10120184ac260727282f2e848b39d7386 (diff)
downloadydb-06068c509279029ceaa73fd5f3a0d43e99e16f12.tar.gz
Intermediate changes
Diffstat (limited to 'contrib')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make7
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py9
-rw-r--r--contrib/python/ydb/py3/ydb/aio/__init__.py1
-rw-r--r--contrib/python/ydb/py3/ydb/aio/_utilities.py3
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/__init__.py7
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/base.py11
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/pool.py108
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/session.py144
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/transaction.py153
-rw-r--r--contrib/python/ydb/py3/ydb/query/__init__.py7
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py264
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py26
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py24
-rw-r--r--contrib/python/ydb/py3/ydb/query/transaction.py108
-rw-r--r--contrib/python/ydb/py3/ydb/retries.py25
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
17 files changed, 602 insertions, 299 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 5b155408fc..b1c2dcdaa2 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.15.0
+Version: 3.16.0
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index cb2a88454a..bc7c34dd4d 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.15.0)
+VERSION(3.16.0)
LICENSE(Apache-2.0)
@@ -57,6 +57,11 @@ PY_SRCS(
ydb/aio/iam.py
ydb/aio/oauth2_token_exchange.py
ydb/aio/pool.py
+ ydb/aio/query/__init__.py
+ ydb/aio/query/base.py
+ ydb/aio/query/pool.py
+ ydb/aio/query/session.py
+ ydb/aio/query/transaction.py
ydb/aio/resolver.py
ydb/aio/scheme.py
ydb/aio/table.py
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
index 66ef0a8c85..95a5744313 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py
@@ -24,7 +24,8 @@ from google.protobuf.message import Message
from google.protobuf.duration_pb2 import Duration as ProtoDuration
from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp
-import ydb.aio
+from ...driver import Driver
+from ...aio.driver import Driver as DriverIO
try:
from ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2
@@ -141,7 +142,7 @@ class IGrpcWrapperAsyncIO(abc.ABC):
...
-SupportedDriverType = Union[ydb.Driver, ydb.aio.Driver]
+SupportedDriverType = Union[Driver, DriverIO]
class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):
@@ -180,7 +181,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):
if self._wait_executor:
self._wait_executor.shutdown(wait)
- async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method):
+ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)
stream_call = await driver(
requests_iterator,
@@ -190,7 +191,7 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):
self._stream_call = stream_call
self.from_server_grpc = stream_call.__aiter__()
- async def _start_sync_driver(self, driver: ydb.Driver, stub, method):
+ async def _start_sync_driver(self, driver: Driver, stub, method):
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
diff --git a/contrib/python/ydb/py3/ydb/aio/__init__.py b/contrib/python/ydb/py3/ydb/aio/__init__.py
index acc44db57a..0e7d4e747a 100644
--- a/contrib/python/ydb/py3/ydb/aio/__init__.py
+++ b/contrib/python/ydb/py3/ydb/aio/__init__.py
@@ -1,2 +1,3 @@
from .driver import Driver # noqa
from .table import SessionPool, retry_operation # noqa
+from .query import QuerySessionPoolAsync, QuerySessionAsync # noqa
diff --git a/contrib/python/ydb/py3/ydb/aio/_utilities.py b/contrib/python/ydb/py3/ydb/aio/_utilities.py
index 10cbead667..454378b0d4 100644
--- a/contrib/python/ydb/py3/ydb/aio/_utilities.py
+++ b/contrib/python/ydb/py3/ydb/aio/_utilities.py
@@ -7,6 +7,9 @@ class AsyncResponseIterator(object):
self.it.cancel()
return self
+ def __iter__(self):
+ return self
+
def __aiter__(self):
return self
diff --git a/contrib/python/ydb/py3/ydb/aio/query/__init__.py b/contrib/python/ydb/py3/ydb/aio/query/__init__.py
new file mode 100644
index 0000000000..829d7b54cf
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/aio/query/__init__.py
@@ -0,0 +1,7 @@
+__all__ = [
+ "QuerySessionPoolAsync",
+ "QuerySessionAsync",
+]
+
+from .pool import QuerySessionPoolAsync
+from .session import QuerySessionAsync
diff --git a/contrib/python/ydb/py3/ydb/aio/query/base.py b/contrib/python/ydb/py3/ydb/aio/query/base.py
new file mode 100644
index 0000000000..3800ce3d4f
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/aio/query/base.py
@@ -0,0 +1,11 @@
+from .. import _utilities
+
+
+class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
+ async def __aenter__(self) -> "AsyncResponseContextIterator":
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ # To close stream on YDB it is necessary to scroll through it to the end
+ async for _ in self:
+ pass
diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py
new file mode 100644
index 0000000000..f91f7465e4
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py
@@ -0,0 +1,108 @@
+import logging
+from typing import (
+ Callable,
+ Optional,
+ List,
+)
+
+from .session import (
+ QuerySessionAsync,
+)
+from ...retries import (
+ RetrySettings,
+ retry_operation_async,
+)
+from ... import convert
+from ..._grpc.grpcwrapper import common_utils
+
+logger = logging.getLogger(__name__)
+
+
+class QuerySessionPoolAsync:
+ """QuerySessionPoolAsync is an object to simplify operations with sessions of Query Service."""
+
+ def __init__(self, driver: common_utils.SupportedDriverType):
+ """
+ :param driver: A driver instance
+ """
+
+ logger.warning("QuerySessionPoolAsync is an experimental API, which could be changed.")
+ self._driver = driver
+
+ def checkout(self) -> "SimpleQuerySessionCheckoutAsync":
+ """WARNING: This API is experimental and could be changed.
+ Return a Session context manager, that opens session on enter and closes session on exit.
+ """
+
+ return SimpleQuerySessionCheckoutAsync(self)
+
+ async def retry_operation_async(
+ self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
+ ):
+ """WARNING: This API is experimental and could be changed.
+ Special interface to execute a bunch of commands with session in a safe, retriable way.
+
+ :param callee: A function, that works with session.
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ async def wrapped_callee():
+ async with self.checkout() as session:
+ return await callee(session, *args, **kwargs)
+
+ return await retry_operation_async(wrapped_callee, retry_settings)
+
+ async def execute_with_retries(
+ self,
+ query: str,
+ parameters: Optional[dict] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
+ ) -> List[convert.ResultSet]:
+ """WARNING: This API is experimental and could be changed.
+ Special interface to execute a one-shot queries in a safe, retriable way.
+ Note: this method loads all data from stream before return, do not use this
+ method with huge read queries.
+
+ :param query: A query, yql or sql text.
+ :param parameters: dict with parameters and YDB types;
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ async def wrapped_callee():
+ async with self.checkout() as session:
+ it = await session.execute(query, parameters, *args, **kwargs)
+ return [result_set async for result_set in it]
+
+ return await retry_operation_async(wrapped_callee, retry_settings)
+
+ async def stop(self, timeout=None):
+ pass # TODO: implement
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.stop()
+
+
+class SimpleQuerySessionCheckoutAsync:
+ def __init__(self, pool: QuerySessionPoolAsync):
+ self._pool = pool
+ self._session = QuerySessionAsync(pool._driver)
+
+ async def __aenter__(self) -> QuerySessionAsync:
+ await self._session.create()
+ return self._session
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self._session.delete()
diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py
new file mode 100644
index 0000000000..627a41d895
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/aio/query/session.py
@@ -0,0 +1,144 @@
+import asyncio
+
+from typing import (
+ Optional,
+)
+
+from .base import AsyncResponseContextIterator
+from .transaction import QueryTxContextAsync
+from .. import _utilities
+from ... import issues
+from ..._grpc.grpcwrapper import common_utils
+from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
+
+from ...query import base
+from ...query.session import (
+ BaseQuerySession,
+ QuerySessionStateEnum,
+)
+
+
+class QuerySessionAsync(BaseQuerySession):
+ """Session object for Query Service. It is not recommended to control
+ session's lifecycle manually - use a QuerySessionPool is always a better choise.
+ """
+
+ _loop: asyncio.AbstractEventLoop
+ _status_stream: _utilities.AsyncResponseIterator = None
+
+ def __init__(
+ self,
+ driver: common_utils.SupportedDriverType,
+ settings: Optional[base.QueryClientSettings] = None,
+ loop: asyncio.AbstractEventLoop = None,
+ ):
+ super(QuerySessionAsync, self).__init__(driver, settings)
+ self._loop = loop if loop is not None else asyncio.get_running_loop()
+
+ async def _attach(self) -> None:
+ self._stream = await self._attach_call()
+ self._status_stream = _utilities.AsyncResponseIterator(
+ self._stream,
+ lambda response: common_utils.ServerStatus.from_proto(response),
+ )
+
+ first_response = await self._status_stream.next()
+ if first_response.status != issues.StatusCode.SUCCESS:
+ pass
+
+ self._state.set_attached(True)
+ self._state._change_state(QuerySessionStateEnum.CREATED)
+
+ self._loop.create_task(self._check_session_status_loop(), name="check session status task")
+
+ async def _check_session_status_loop(self) -> None:
+ try:
+ async for status in self._status_stream:
+ if status.status != issues.StatusCode.SUCCESS:
+ self._state.reset()
+ self._state._change_state(QuerySessionStateEnum.CLOSED)
+ except Exception:
+ if not self._state._already_in(QuerySessionStateEnum.CLOSED):
+ self._state.reset()
+ self._state._change_state(QuerySessionStateEnum.CLOSED)
+
+ async def delete(self) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Deletes a Session of Query Service on server side and releases resources.
+
+ :return: None
+ """
+ if self._state._already_in(QuerySessionStateEnum.CLOSED):
+ return
+
+ self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
+ await self._delete_call()
+ self._stream.cancel()
+
+ async def create(self) -> "QuerySessionAsync":
+ """WARNING: This API is experimental and could be changed.
+
+ Creates a Session of Query Service on server side and attaches it.
+
+ :return: QuerySessionSync object.
+ """
+ if self._state._already_in(QuerySessionStateEnum.CREATED):
+ return
+
+ self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
+ await self._create_call()
+ await self._attach()
+
+ return self
+
+ def transaction(self, tx_mode=None) -> QueryTxContextAsync:
+ self._state._check_session_ready_to_use()
+ tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
+
+ return QueryTxContextAsync(
+ self._driver,
+ self._state,
+ self,
+ tx_mode,
+ )
+
+ async def execute(
+ self,
+ query: str,
+ parameters: dict = None,
+ syntax: base.QuerySyntax = None,
+ exec_mode: base.QueryExecMode = None,
+ concurrent_result_sets: bool = False,
+ ) -> AsyncResponseContextIterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param parameters: dict with parameters and YDB types;
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+ self._state._check_session_ready_to_use()
+
+ stream_it = await self._execute_call(
+ query=query,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+
+ return AsyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ settings=self._settings,
+ ),
+ )
diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
new file mode 100644
index 0000000000..e9993fccc2
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
@@ -0,0 +1,153 @@
+import logging
+from typing import (
+ Optional,
+)
+
+from .base import AsyncResponseContextIterator
+from ... import issues
+from ...query import base
+from ...query.transaction import (
+ BaseQueryTxContext,
+ QueryTxStateEnum,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class QueryTxContextAsync(BaseQueryTxContext):
+ async def __aenter__(self) -> "QueryTxContextAsync":
+ """
+ Enters a context manager and returns a transaction
+
+ :return: A transaction instance
+ """
+ return self
+
+ async def __aexit__(self, *args, **kwargs):
+ """
+ Closes a transaction context manager and rollbacks transaction if
+ it is not finished explicitly
+ """
+ await self._ensure_prev_stream_finished()
+ if self._tx_state._state == QueryTxStateEnum.BEGINED:
+ # It's strictly recommended to close transactions directly
+ # by using commit_tx=True flag while executing statement or by
+ # .commit() or .rollback() methods, but here we trying to do best
+ # effort to avoid useless open transactions
+ logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
+ try:
+ await self.rollback()
+ except issues.Error:
+ logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
+
+ async def _ensure_prev_stream_finished(self) -> None:
+ if self._prev_stream is not None:
+ async with self._prev_stream:
+ pass
+ self._prev_stream = None
+
+ async def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextAsync":
+ """WARNING: This API is experimental and could be changed.
+
+ Explicitly begins a transaction
+
+ :param settings: A request settings
+
+ :return: None or exception if begin is failed
+ """
+ await self._begin_call(settings)
+ return self
+
+ async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: A committed transaction or exception if commit is failed
+ """
+ if self._tx_state._already_in(QueryTxStateEnum.COMMITTED):
+ return
+
+ if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
+ self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
+ return
+
+ await self._ensure_prev_stream_finished()
+
+ await self._commit_call(settings)
+
+ async def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: A committed transaction or exception if commit is failed
+ """
+ if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED):
+ return
+
+ if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
+ self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED)
+ return
+
+ await self._ensure_prev_stream_finished()
+
+ await self._rollback_call(settings)
+
+ async def execute(
+ self,
+ query: str,
+ parameters: Optional[dict] = None,
+ commit_tx: Optional[bool] = False,
+ syntax: Optional[base.QuerySyntax] = None,
+ exec_mode: Optional[base.QueryExecMode] = None,
+ concurrent_result_sets: Optional[bool] = False,
+ settings: Optional[base.QueryClientSettings] = None,
+ ) -> AsyncResponseContextIterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param parameters: dict with parameters and YDB types;
+ :param commit_tx: A special flag that allows transaction commit.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param exec_mode: Exec mode of the query, which is a one from the following choises:
+ 1) QueryExecMode.EXECUTE, which is default;
+ 2) QueryExecMode.EXPLAIN;
+ 3) QueryExecMode.VALIDATE;
+ 4) QueryExecMode.PARSE.
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+ await self._ensure_prev_stream_finished()
+
+ stream_it = await self._execute_call(
+ query=query,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+
+ settings = settings if settings is not None else self.session._settings
+ self._prev_stream = AsyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ tx=self,
+ commit_tx=commit_tx,
+ settings=settings,
+ ),
+ )
+ return self._prev_stream
diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py
index eb967abc20..40e512cd6b 100644
--- a/contrib/python/ydb/py3/ydb/query/__init__.py
+++ b/contrib/python/ydb/py3/ydb/query/__init__.py
@@ -11,13 +11,12 @@ __all__ = [
import logging
from .base import (
- IQueryClient,
- SupportedDriverType,
QueryClientSettings,
)
from .session import QuerySessionSync
+from .._grpc.grpcwrapper import common_utils
from .._grpc.grpcwrapper.ydb_query_public_types import (
QueryOnlineReadOnly,
QuerySerializableReadWrite,
@@ -30,8 +29,8 @@ from .pool import QuerySessionPool
logger = logging.getLogger(__name__)
-class QueryClientSync(IQueryClient):
- def __init__(self, driver: SupportedDriverType, query_client_settings: QueryClientSettings = None):
+class QueryClientSync:
+ def __init__(self, driver: common_utils.SupportedDriverType, query_client_settings: QueryClientSettings = None):
logger.warning("QueryClientSync is an experimental API, which could be changed.")
self._driver = driver
self._settings = query_client_settings
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
index e08d9f52d0..55087d0c4e 100644
--- a/contrib/python/ydb/py3/ydb/query/base.py
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -2,14 +2,11 @@ import abc
import enum
import functools
+import typing
from typing import (
- Iterator,
Optional,
)
-from .._grpc.grpcwrapper.common_utils import (
- SupportedDriverType,
-)
from .._grpc.grpcwrapper import ydb_query
from .._grpc.grpcwrapper.ydb_query_public_types import (
BaseQueryTxMode,
@@ -20,6 +17,9 @@ from .. import issues
from .. import _utilities
from .. import _apis
+if typing.TYPE_CHECKING:
+ from .transaction import BaseQueryTxContext
+
class QuerySyntax(enum.IntEnum):
UNSPECIFIED = 0
@@ -48,12 +48,38 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
+ # To close stream on YDB it is necessary to scroll through it to the end
for _ in self:
pass
class QueryClientSettings:
- pass
+ def __init__(self):
+ self._native_datetime_in_result_sets = True
+ self._native_date_in_result_sets = True
+ self._native_json_in_result_sets = True
+ self._native_interval_in_result_sets = True
+ self._native_timestamp_in_result_sets = True
+
+ def with_native_timestamp_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
+ self._native_timestamp_in_result_sets = enabled
+ return self
+
+ def with_native_interval_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
+ self._native_interval_in_result_sets = enabled
+ return self
+
+ def with_native_json_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
+ self._native_json_in_result_sets = enabled
+ return self
+
+ def with_native_date_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
+ self._native_date_in_result_sets = enabled
+ return self
+
+ def with_native_datetime_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
+ self._native_datetime_in_result_sets = enabled
+ return self
class IQuerySessionState(abc.ABC):
@@ -92,229 +118,6 @@ class IQuerySessionState(abc.ABC):
pass
-class IQuerySession(abc.ABC):
- """Session object for Query Service. It is not recommended to control
- session's lifecycle manually - use a QuerySessionPool is always a better choise.
- """
-
- @abc.abstractmethod
- def __init__(self, driver: SupportedDriverType, settings: Optional[QueryClientSettings] = None):
- pass
-
- @abc.abstractmethod
- def create(self) -> "IQuerySession":
- """WARNING: This API is experimental and could be changed.
-
- Creates a Session of Query Service on server side and attaches it.
-
- :return: Session object.
- """
- pass
-
- @abc.abstractmethod
- def delete(self) -> None:
- """WARNING: This API is experimental and could be changed.
-
- Deletes a Session of Query Service on server side and releases resources.
-
- :return: None
- """
- pass
-
- @abc.abstractmethod
- def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxContext":
- """WARNING: This API is experimental and could be changed.
-
- Creates a transaction context manager with specified transaction mode.
-
- :param tx_mode: Transaction mode, which is a one from the following choises:
- 1) QuerySerializableReadWrite() which is default mode;
- 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
- 3) QuerySnapshotReadOnly();
- 4) QueryStaleReadOnly().
-
- :return: transaction context manager.
- """
- pass
-
- @abc.abstractmethod
- def execute(
- self,
- query: str,
- syntax: Optional[QuerySyntax] = None,
- exec_mode: Optional[QueryExecMode] = None,
- parameters: Optional[dict] = None,
- concurrent_result_sets: Optional[bool] = False,
- ) -> Iterator:
- """WARNING: This API is experimental and could be changed.
-
- Sends a query to Query Service
- :param query: (YQL or SQL text) to be executed.
- :param syntax: Syntax of the query, which is a one from the following choises:
- 1) QuerySyntax.YQL_V1, which is default;
- 2) QuerySyntax.PG.
- :param parameters: dict with parameters and YDB types;
- :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
-
- :return: Iterator with result sets
- """
-
-
-class IQueryTxContext(abc.ABC):
- """
- An object that provides a simple transaction context manager that allows statements execution
- in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
- transaction control logic, and opens new transaction if:
- 1) By explicit .begin();
- 2) On execution of a first statement, which is strictly recommended method, because that avoids
- useless round trip
-
- This context manager is not thread-safe, so you should not manipulate on it concurrently.
- """
-
- @abc.abstractmethod
- def __init__(
- self,
- driver: SupportedDriverType,
- session_state: IQuerySessionState,
- session: IQuerySession,
- tx_mode: BaseQueryTxMode,
- ):
- """
- An object that provides a simple transaction context manager that allows statements execution
- in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
- transaction control logic, and opens new transaction if:
-
- 1) By explicit .begin() method;
- 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip
-
- This context manager is not thread-safe, so you should not manipulate on it concurrently.
-
- :param driver: A driver instance
- :param session_state: A state of session
- :param tx_mode: Transaction mode, which is a one from the following choises:
- 1) QuerySerializableReadWrite() which is default mode;
- 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
- 3) QuerySnapshotReadOnly();
- 4) QueryStaleReadOnly().
- """
- pass
-
- @abc.abstractmethod
- def __enter__(self) -> "IQueryTxContext":
- """
- Enters a context manager and returns a transaction
-
- :return: A transaction instance
- """
- pass
-
- @abc.abstractmethod
- def __exit__(self, *args, **kwargs):
- """
- Closes a transaction context manager and rollbacks transaction if
- it is not finished explicitly
- """
- pass
-
- @property
- @abc.abstractmethod
- def session_id(self) -> str:
- """
- A transaction's session id
-
- :return: A transaction's session id
- """
- pass
-
- @property
- @abc.abstractmethod
- def tx_id(self) -> Optional[str]:
- """
- Returns an id of open transaction or None otherwise
-
- :return: An id of open transaction or None otherwise
- """
- pass
-
- @abc.abstractmethod
- def begin(self, settings: Optional[QueryClientSettings] = None) -> None:
- """WARNING: This API is experimental and could be changed.
-
- Explicitly begins a transaction
-
- :param settings: A request settings
-
- :return: None or exception if begin is failed
- """
- pass
-
- @abc.abstractmethod
- def commit(self, settings: Optional[QueryClientSettings] = None) -> None:
- """WARNING: This API is experimental and could be changed.
-
- Calls commit on a transaction if it is open. If transaction execution
- failed then this method raises PreconditionFailed.
-
- :param settings: A request settings
-
- :return: None or exception if commit is failed
- """
- pass
-
- @abc.abstractmethod
- def rollback(self, settings: Optional[QueryClientSettings] = None) -> None:
- """WARNING: This API is experimental and could be changed.
-
- Calls rollback on a transaction if it is open. If transaction execution
- failed then this method raises PreconditionFailed.
-
- :param settings: A request settings
-
- :return: None or exception if rollback is failed
- """
- pass
-
- @abc.abstractmethod
- def execute(
- self,
- query: str,
- commit_tx: Optional[bool] = False,
- syntax: Optional[QuerySyntax] = None,
- exec_mode: Optional[QueryExecMode] = None,
- parameters: Optional[dict] = None,
- concurrent_result_sets: Optional[bool] = False,
- ) -> Iterator:
- """WARNING: This API is experimental and could be changed.
-
- Sends a query to Query Service
- :param query: (YQL or SQL text) to be executed.
- :param commit_tx: A special flag that allows transaction commit.
- :param syntax: Syntax of the query, which is a one from the following choises:
- 1) QuerySyntax.YQL_V1, which is default;
- 2) QuerySyntax.PG.
- :param exec_mode: Exec mode of the query, which is a one from the following choises:
- 1) QueryExecMode.EXECUTE, which is default;
- 2) QueryExecMode.EXPLAIN;
- 3) QueryExecMode.VALIDATE;
- 4) QueryExecMode.PARSE.
- :param parameters: dict with parameters and YDB types;
- :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
-
- :return: Iterator with result sets
- """
- pass
-
-
-class IQueryClient(abc.ABC):
- def __init__(self, driver: SupportedDriverType, query_client_settings: Optional[QueryClientSettings] = None):
- pass
-
- @abc.abstractmethod
- def session(self) -> IQuerySession:
- pass
-
-
def create_execute_query_request(
query: str,
session_id: str,
@@ -365,15 +168,16 @@ def create_execute_query_request(
def wrap_execute_query_response(
rpc_state: RpcState,
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
- tx: Optional[IQueryTxContext] = None,
+ tx: Optional["BaseQueryTxContext"] = None,
commit_tx: Optional[bool] = False,
+ settings: Optional[QueryClientSettings] = None,
) -> convert.ResultSet:
issues._process_response(response_pb)
if tx and response_pb.tx_meta and not tx.tx_id:
tx._move_to_beginned(response_pb.tx_meta.id)
if tx and commit_tx:
tx._move_to_commited()
- return convert.ResultSet.from_message(response_pb.result_set)
+ return convert.ResultSet.from_message(response_pb.result_set, settings)
def bad_session_handler(func):
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index e7514cdf76..afe39f0623 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -5,7 +5,6 @@ from typing import (
List,
)
-from . import base
from .session import (
QuerySessionSync,
)
@@ -14,6 +13,8 @@ from ..retries import (
retry_operation_sync,
)
from .. import convert
+from .._grpc.grpcwrapper import common_utils
+
logger = logging.getLogger(__name__)
@@ -21,7 +22,7 @@ logger = logging.getLogger(__name__)
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
- def __init__(self, driver: base.SupportedDriverType):
+ def __init__(self, driver: common_utils.SupportedDriverType):
"""
:param driver: A driver instance
"""
@@ -55,7 +56,12 @@ class QuerySessionPool:
return retry_operation_sync(wrapped_callee, retry_settings)
def execute_with_retries(
- self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
+ self,
+ query: str,
+ parameters: Optional[dict] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
) -> List[convert.ResultSet]:
"""WARNING: This API is experimental and could be changed.
Special interface to execute a one-shot queries in a safe, retriable way.
@@ -63,6 +69,7 @@ class QuerySessionPool:
method with huge read queries.
:param query: A query, yql or sql text.
+ :param parameters: dict with parameters and YDB types;
:param retry_settings: RetrySettings object.
:return: Result sets or exception in case of execution errors.
@@ -72,18 +79,27 @@ class QuerySessionPool:
def wrapped_callee():
with self.checkout() as session:
- it = session.execute(query, *args, **kwargs)
+ it = session.execute(query, parameters, *args, **kwargs)
return [result_set for result_set in it]
return retry_operation_sync(wrapped_callee, retry_settings)
+ def stop(self, timeout=None):
+ pass # TODO: implement
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.stop()
+
class SimpleQuerySessionCheckout:
def __init__(self, pool: QuerySessionPool):
self._pool = pool
self._session = QuerySessionSync(pool._driver)
- def __enter__(self) -> base.IQuerySession:
+ def __enter__(self) -> QuerySessionSync:
self._session.create()
return self._session
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
index d6034d348a..4b051dc16f 100644
--- a/contrib/python/ydb/py3/ydb/query/session.py
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -15,7 +15,7 @@ from .._grpc.grpcwrapper import common_utils
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
-from .transaction import BaseQueryTxContext
+from .transaction import QueryTxContextSync
logger = logging.getLogger(__name__)
@@ -126,12 +126,12 @@ def wrapper_delete_session(
return session
-class BaseQuerySession(base.IQuerySession):
- _driver: base.SupportedDriverType
+class BaseQuerySession:
+ _driver: common_utils.SupportedDriverType
_settings: base.QueryClientSettings
_state: QuerySessionState
- def __init__(self, driver: base.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
+ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
self._driver = driver
self._settings = settings if settings is not None else base.QueryClientSettings()
self._state = QuerySessionState(settings)
@@ -224,7 +224,9 @@ class QuerySessionSync(BaseQuerySession):
self._state.reset()
self._state._change_state(QuerySessionStateEnum.CLOSED)
except Exception:
- pass
+ if not self._state._already_in(QuerySessionStateEnum.CLOSED):
+ self._state.reset()
+ self._state._change_state(QuerySessionStateEnum.CLOSED)
def delete(self) -> None:
"""WARNING: This API is experimental and could be changed.
@@ -256,7 +258,7 @@ class QuerySessionSync(BaseQuerySession):
return self
- def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQueryTxContext:
+ def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContextSync:
"""WARNING: This API is experimental and could be changed.
Creates a transaction context manager with specified transaction mode.
@@ -273,7 +275,7 @@ class QuerySessionSync(BaseQuerySession):
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
- return BaseQueryTxContext(
+ return QueryTxContextSync(
self._driver,
self._state,
self,
@@ -283,9 +285,9 @@ class QuerySessionSync(BaseQuerySession):
def execute(
self,
query: str,
+ parameters: dict = None,
syntax: base.QuerySyntax = None,
exec_mode: base.QueryExecMode = None,
- parameters: dict = None,
concurrent_result_sets: bool = False,
) -> base.SyncResponseContextIterator:
"""WARNING: This API is experimental and could be changed.
@@ -313,5 +315,9 @@ class QuerySessionSync(BaseQuerySession):
return base.SyncResponseContextIterator(
stream_it,
- lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp),
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ settings=self._settings,
+ ),
)
diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py
index 0a49320293..750a94b0b5 100644
--- a/contrib/python/ydb/py3/ydb/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/query/transaction.py
@@ -169,7 +169,7 @@ def wrap_tx_rollback_response(
return tx
-class BaseQueryTxContext(base.IQueryTxContext):
+class BaseQueryTxContext:
def __init__(self, driver, session_state, session, tx_mode):
"""
An object that provides a simple transaction context manager that allows statements execution
@@ -196,31 +196,6 @@ class BaseQueryTxContext(base.IQueryTxContext):
self.session = session
self._prev_stream = None
- def __enter__(self) -> "BaseQueryTxContext":
- """
- Enters a context manager and returns a transaction
-
- :return: A transaction instance
- """
- return self
-
- def __exit__(self, *args, **kwargs):
- """
- Closes a transaction context manager and rollbacks transaction if
- it is not finished explicitly
- """
- self._ensure_prev_stream_finished()
- if self._tx_state._state == QueryTxStateEnum.BEGINED:
- # It's strictly recommended to close transactions directly
- # by using commit_tx=True flag while executing statement or by
- # .commit() or .rollback() methods, but here we trying to do best
- # effort to avoid useless open transactions
- logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
- try:
- self.rollback()
- except issues.Error:
- logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
-
@property
def session_id(self) -> str:
"""
@@ -240,6 +215,8 @@ class BaseQueryTxContext(base.IQueryTxContext):
return self._tx_state.tx_id
def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED)
+
return self._driver(
_create_begin_transaction_request(self._session_state, self._tx_state),
_apis.QueryService.Stub,
@@ -250,6 +227,8 @@ class BaseQueryTxContext(base.IQueryTxContext):
)
def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED)
+
return self._driver(
_create_commit_transaction_request(self._session_state, self._tx_state),
_apis.QueryService.Stub,
@@ -260,6 +239,8 @@ class BaseQueryTxContext(base.IQueryTxContext):
)
def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED)
+
return self._driver(
_create_rollback_transaction_request(self._session_state, self._tx_state),
_apis.QueryService.Stub,
@@ -278,6 +259,8 @@ class BaseQueryTxContext(base.IQueryTxContext):
parameters: dict = None,
concurrent_result_sets: bool = False,
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
+ self._tx_state._check_tx_ready_to_use()
+
request = base.create_execute_query_request(
query=query,
session_id=self._session_state.session_id,
@@ -296,12 +279,6 @@ class BaseQueryTxContext(base.IQueryTxContext):
_apis.QueryService.ExecuteQuery,
)
- def _ensure_prev_stream_finished(self) -> None:
- if self._prev_stream is not None:
- for _ in self._prev_stream:
- pass
- self._prev_stream = None
-
def _move_to_beginned(self, tx_id: str) -> None:
if self._tx_state._already_in(QueryTxStateEnum.BEGINED):
return
@@ -313,19 +290,52 @@ class BaseQueryTxContext(base.IQueryTxContext):
return
self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
- def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+
+class QueryTxContextSync(BaseQueryTxContext):
+ def __enter__(self) -> "BaseQueryTxContext":
+ """
+ Enters a context manager and returns a transaction
+
+ :return: A transaction instance
+ """
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ """
+ Closes a transaction context manager and rollbacks transaction if
+ it is not finished explicitly
+ """
+ self._ensure_prev_stream_finished()
+ if self._tx_state._state == QueryTxStateEnum.BEGINED:
+ # It's strictly recommended to close transactions directly
+ # by using commit_tx=True flag while executing statement or by
+ # .commit() or .rollback() methods, but here we trying to do best
+ # effort to avoid useless open transactions
+ logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
+ try:
+ self.rollback()
+ except issues.Error:
+ logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
+
+ def _ensure_prev_stream_finished(self) -> None:
+ if self._prev_stream is not None:
+ with self._prev_stream:
+ pass
+ self._prev_stream = None
+
+ def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextSync":
"""WARNING: This API is experimental and could be changed.
Explicitly begins a transaction
:param settings: A request settings
- :return: None or exception if begin is failed
+ :return: Transaction object or exception if begin is failed
"""
- self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED)
-
self._begin_call(settings)
+ return self
+
def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None:
"""WARNING: This API is experimental and could be changed.
@@ -338,43 +348,51 @@ class BaseQueryTxContext(base.IQueryTxContext):
"""
if self._tx_state._already_in(QueryTxStateEnum.COMMITTED):
return
- self._ensure_prev_stream_finished()
if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
return
- self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED)
+ self._ensure_prev_stream_finished()
self._commit_call(settings)
def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: A committed transaction or exception if commit is failed
+ """
if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED):
return
- self._ensure_prev_stream_finished()
-
if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED)
return
- self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED)
+ self._ensure_prev_stream_finished()
self._rollback_call(settings)
def execute(
self,
query: str,
+ parameters: Optional[dict] = None,
commit_tx: Optional[bool] = False,
syntax: Optional[base.QuerySyntax] = None,
exec_mode: Optional[base.QueryExecMode] = None,
- parameters: Optional[dict] = None,
concurrent_result_sets: Optional[bool] = False,
+ settings: Optional[base.QueryClientSettings] = None,
) -> base.SyncResponseContextIterator:
"""WARNING: This API is experimental and could be changed.
Sends a query to Query Service
:param query: (YQL or SQL text) to be executed.
+ :param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
:param syntax: Syntax of the query, which is a one from the following choises:
1) QuerySyntax.YQL_V1, which is default;
@@ -384,13 +402,12 @@ class BaseQueryTxContext(base.IQueryTxContext):
2) QueryExecMode.EXPLAIN;
3) QueryExecMode.VALIDATE;
4) QueryExecMode.PARSE.
- :param parameters: dict with parameters and YDB types;
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+ :param settings: An additional request settings QueryClientSettings;
:return: Iterator with result sets
"""
self._ensure_prev_stream_finished()
- self._tx_state._check_tx_ready_to_use()
stream_it = self._execute_call(
query=query,
@@ -400,6 +417,8 @@ class BaseQueryTxContext(base.IQueryTxContext):
parameters=parameters,
concurrent_result_sets=concurrent_result_sets,
)
+
+ settings = settings if settings is not None else self.session._settings
self._prev_stream = base.SyncResponseContextIterator(
stream_it,
lambda resp: base.wrap_execute_query_response(
@@ -407,6 +426,7 @@ class BaseQueryTxContext(base.IQueryTxContext):
response_pb=resp,
tx=self,
commit_tx=commit_tx,
+ settings=settings,
),
)
return self._prev_stream
diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py
index 5d4f6e6a0f..c9c23b1a91 100644
--- a/contrib/python/ydb/py3/ydb/retries.py
+++ b/contrib/python/ydb/py3/ydb/retries.py
@@ -1,3 +1,4 @@
+import asyncio
import random
import time
@@ -134,3 +135,27 @@ def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
time.sleep(next_opt.timeout)
else:
return next_opt.result
+
+
+async def retry_operation_async(callee, retry_settings=None, *args, **kwargs): # pylint: disable=W1113
+ """
+ The retry operation helper can be used to retry a coroutine that raises YDB specific
+ exceptions.
+
+ :param callee: A coroutine to retry.
+ :param retry_settings: An instance of ydb.RetrySettings that describes how the coroutine
+ should be retried. If None, default instance of retry settings will be used.
+ :param args: A tuple with positional arguments to be passed into the coroutine.
+ :param kwargs: A dictionary with keyword arguments to be passed into the coroutine.
+
+ Returns awaitable result of coroutine. If retries are not succussful exception is raised.
+ """
+ opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
+ for next_opt in opt_generator:
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ await asyncio.sleep(next_opt.timeout)
+ else:
+ try:
+ return await next_opt.result
+ except BaseException as e: # pylint: disable=W0703
+ next_opt.set_exception(e)
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 567cda12c7..76caab7b48 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.15.0"
+VERSION = "3.16.0"