aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-08-09 17:57:55 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-08-09 18:56:25 +0300
commit88da31b9c96e50f21978fb1a45b3fd273f1b6dce (patch)
treeb9181add08a7b1ab8f256a7b1bb7d0a9c33c3595 /contrib/python
parentc3665c2967de61ecb4751064a3aa0284ba5f11de (diff)
downloadydb-88da31b9c96e50f21978fb1a45b3fd273f1b6dce.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make11
-rw-r--r--contrib/python/ydb/py3/ydb/__init__.py6
-rw-r--r--contrib/python/ydb/py3/ydb/_apis.py40
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py181
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py65
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py4
-rw-r--r--contrib/python/ydb/py3/ydb/convert.py60
-rw-r--r--contrib/python/ydb/py3/ydb/draft/_apis.py31
-rw-r--r--contrib/python/ydb/py3/ydb/draft/dynamic_config.py3
-rw-r--r--contrib/python/ydb/py3/ydb/driver.py7
-rw-r--r--contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py212
-rw-r--r--contrib/python/ydb/py3/ydb/query/__init__.py40
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py388
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py91
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py317
-rw-r--r--contrib/python/ydb/py3/ydb/query/transaction.py412
-rw-r--r--contrib/python/ydb/py3/ydb/retries.py136
-rw-r--r--contrib/python/ydb/py3/ydb/table.py143
-rw-r--r--contrib/python/ydb/py3/ydb/types.py7
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
22 files changed, 1994 insertions, 166 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 64e6538edb..5b155408fc 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.13.0
+Version: 3.15.0
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index d0fb350d8c..cb2a88454a 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.13.0)
+VERSION(3.15.0)
LICENSE(Apache-2.0)
@@ -29,6 +29,8 @@ PY_SRCS(
ydb/_grpc/common/__init__.py
ydb/_grpc/grpcwrapper/__init__.py
ydb/_grpc/grpcwrapper/common_utils.py
+ ydb/_grpc/grpcwrapper/ydb_query.py
+ ydb/_grpc/grpcwrapper/ydb_query_public_types.py
ydb/_grpc/grpcwrapper/ydb_scheme.py
ydb/_grpc/grpcwrapper/ydb_topic.py
ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
@@ -68,6 +70,7 @@ PY_SRCS(
ydb/dbapi/errors.py
ydb/default_pem.py
ydb/draft/__init__.py
+ ydb/draft/_apis.py
ydb/draft/dynamic_config.py
ydb/driver.py
ydb/export.py
@@ -82,7 +85,13 @@ PY_SRCS(
ydb/oauth2_token_exchange/token_source.py
ydb/operation.py
ydb/pool.py
+ ydb/query/__init__.py
+ ydb/query/base.py
+ ydb/query/pool.py
+ ydb/query/session.py
+ ydb/query/transaction.py
ydb/resolver.py
+ ydb/retries.py
ydb/scheme.py
ydb/scripting.py
ydb/settings.py
diff --git a/contrib/python/ydb/py3/ydb/__init__.py b/contrib/python/ydb/py3/ydb/__init__.py
index 8bfba3a902..db4f6e607f 100644
--- a/contrib/python/ydb/py3/ydb/__init__.py
+++ b/contrib/python/ydb/py3/ydb/__init__.py
@@ -2,6 +2,10 @@ from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
+from .ydb_version import VERSION
+
+__version__ = VERSION
+
from .credentials import * # noqa
from .driver import * # noqa
from .global_settings import * # noqa
@@ -19,6 +23,8 @@ from .import_client import * # noqa
from .tracing import * # noqa
from .topic import * # noqa
from .draft import * # noqa
+from .query import * # noqa
+from .retries import * # noqa
try:
import ydb.aio as aio # noqa
diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py
index e340150996..fc28d0ceb2 100644
--- a/contrib/python/ydb/py3/ydb/_apis.py
+++ b/contrib/python/ydb/py3/ydb/_apis.py
@@ -9,10 +9,7 @@ try:
ydb_table_v1_pb2_grpc,
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
- )
-
- from ydb.public.api.grpc.draft import (
- ydb_dynamic_config_v1_pb2_grpc,
+ ydb_query_v1_pb2_grpc,
)
from ydb.public.api.protos import (
@@ -23,11 +20,9 @@ try:
ydb_value_pb2,
ydb_operation_pb2,
ydb_common_pb2,
+ ydb_query_pb2,
)
- from ydb.public.api.protos.draft import (
- ydb_dynamic_config_pb2,
- )
except ImportError:
from contrib.ydb.public.api.grpc import (
ydb_cms_v1_pb2_grpc,
@@ -36,10 +31,7 @@ except ImportError:
ydb_table_v1_pb2_grpc,
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
- )
-
- from contrib.ydb.public.api.grpc.draft import (
- ydb_dynamic_config_v1_pb2_grpc,
+ ydb_query_v1_pb2_grpc,
)
from contrib.ydb.public.api.protos import (
@@ -50,10 +42,7 @@ except ImportError:
ydb_value_pb2,
ydb_operation_pb2,
ydb_common_pb2,
- )
-
- from contrib.ydb.public.api.protos.draft import (
- ydb_dynamic_config_pb2,
+ ydb_query_pb2,
)
@@ -65,7 +54,7 @@ ydb_scheme = ydb_scheme_pb2
ydb_table = ydb_table_pb2
ydb_discovery = ydb_discovery_pb2
ydb_operation = ydb_operation_pb2
-ydb_dynamic_config = ydb_dynamic_config_pb2
+ydb_query = ydb_query_pb2
class CmsService(object):
@@ -128,10 +117,17 @@ class TopicService(object):
StreamWrite = "StreamWrite"
-class DynamicConfigService(object):
- Stub = ydb_dynamic_config_v1_pb2_grpc.DynamicConfigServiceStub
+class QueryService(object):
+ Stub = ydb_query_v1_pb2_grpc.QueryServiceStub
+
+ CreateSession = "CreateSession"
+ DeleteSession = "DeleteSession"
+ AttachSession = "AttachSession"
+
+ BeginTransaction = "BeginTransaction"
+ CommitTransaction = "CommitTransaction"
+ RollbackTransaction = "RollbackTransaction"
- ReplaceConfig = "ReplaceConfig"
- SetConfig = "SetConfig"
- GetConfig = "GetConfig"
- GetNodeLabels = "GetNodeLabels"
+ ExecuteQuery = "ExecuteQuery"
+ ExecuteScript = "ExecuteScript"
+ FetchScriptResults = "FetchScriptResults"
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py
new file mode 100644
index 0000000000..8fc09b0072
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py
@@ -0,0 +1,181 @@
+from dataclasses import dataclass
+import typing
+from typing import Optional
+
+
+try:
+ from ydb.public.api.protos import ydb_query_pb2
+except ImportError:
+ from contrib.ydb.public.api.protos import ydb_query_pb2
+
+from . import ydb_query_public_types as public_types
+
+from .common_utils import (
+ IFromProto,
+ IToProto,
+ IFromPublic,
+ ServerStatus,
+)
+
+from ... import convert
+
+
+@dataclass
+class CreateSessionResponse(IFromProto):
+ status: ServerStatus
+ session_id: str
+ node_id: int
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.CreateSessionResponse) -> "CreateSessionResponse":
+ return CreateSessionResponse(
+ status=ServerStatus(msg.status, msg.issues),
+ session_id=msg.session_id,
+ node_id=msg.node_id,
+ )
+
+
+@dataclass
+class DeleteSessionResponse(IFromProto):
+ status: ServerStatus
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.DeleteSessionResponse) -> "DeleteSessionResponse":
+ return DeleteSessionResponse(status=ServerStatus(msg.status, msg.issues))
+
+
+@dataclass
+class AttachSessionRequest(IToProto):
+ session_id: str
+
+ def to_proto(self) -> ydb_query_pb2.AttachSessionRequest:
+ return ydb_query_pb2.AttachSessionRequest(session_id=self.session_id)
+
+
+@dataclass
+class TransactionMeta(IFromProto):
+ tx_id: str
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.TransactionMeta) -> "TransactionMeta":
+ return TransactionMeta(tx_id=msg.id)
+
+
+@dataclass
+class TransactionSettings(IFromPublic, IToProto):
+ tx_mode: public_types.BaseQueryTxMode
+
+ @staticmethod
+ def from_public(tx_mode: public_types.BaseQueryTxMode) -> "TransactionSettings":
+ return TransactionSettings(tx_mode=tx_mode)
+
+ def to_proto(self) -> ydb_query_pb2.TransactionSettings:
+ if self.tx_mode.name == "snapshot_read_only":
+ return ydb_query_pb2.TransactionSettings(snapshot_read_only=self.tx_mode.to_proto())
+ if self.tx_mode.name == "serializable_read_write":
+ return ydb_query_pb2.TransactionSettings(serializable_read_write=self.tx_mode.to_proto())
+ if self.tx_mode.name == "online_read_only":
+ return ydb_query_pb2.TransactionSettings(online_read_only=self.tx_mode.to_proto())
+ if self.tx_mode.name == "stale_read_only":
+ return ydb_query_pb2.TransactionSettings(stale_read_only=self.tx_mode.to_proto())
+
+
+@dataclass
+class BeginTransactionRequest(IToProto):
+ session_id: str
+ tx_settings: TransactionSettings
+
+ def to_proto(self) -> ydb_query_pb2.BeginTransactionRequest:
+ return ydb_query_pb2.BeginTransactionRequest(
+ session_id=self.session_id,
+ tx_settings=self.tx_settings.to_proto(),
+ )
+
+
+@dataclass
+class BeginTransactionResponse(IFromProto):
+ status: Optional[ServerStatus]
+ tx_meta: TransactionMeta
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.BeginTransactionResponse) -> "BeginTransactionResponse":
+ return BeginTransactionResponse(
+ status=ServerStatus(msg.status, msg.issues),
+ tx_meta=TransactionMeta.from_proto(msg.tx_meta),
+ )
+
+
+@dataclass
+class CommitTransactionResponse(IFromProto):
+ status: Optional[ServerStatus]
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.CommitTransactionResponse) -> "CommitTransactionResponse":
+ return CommitTransactionResponse(
+ status=ServerStatus(msg.status, msg.issues),
+ )
+
+
+@dataclass
+class RollbackTransactionResponse(IFromProto):
+ status: Optional[ServerStatus]
+
+ @staticmethod
+ def from_proto(msg: ydb_query_pb2.RollbackTransactionResponse) -> "RollbackTransactionResponse":
+ return RollbackTransactionResponse(
+ status=ServerStatus(msg.status, msg.issues),
+ )
+
+
+@dataclass
+class QueryContent(IFromPublic, IToProto):
+ text: str
+ syntax: int
+
+ @staticmethod
+ def from_public(query: str, syntax: int) -> "QueryContent":
+ return QueryContent(text=query, syntax=syntax)
+
+ def to_proto(self) -> ydb_query_pb2.QueryContent:
+ return ydb_query_pb2.QueryContent(text=self.text, syntax=self.syntax)
+
+
+@dataclass
+class TransactionControl(IToProto):
+ begin_tx: Optional[TransactionSettings]
+ commit_tx: Optional[bool]
+ tx_id: Optional[str]
+
+ def to_proto(self) -> ydb_query_pb2.TransactionControl:
+ if self.tx_id:
+ return ydb_query_pb2.TransactionControl(
+ tx_id=self.tx_id,
+ commit_tx=self.commit_tx,
+ )
+ return ydb_query_pb2.TransactionControl(
+ begin_tx=self.begin_tx.to_proto(),
+ commit_tx=self.commit_tx,
+ )
+
+
+@dataclass
+class ExecuteQueryRequest(IToProto):
+ session_id: str
+ query_content: QueryContent
+ tx_control: TransactionControl
+ concurrent_result_sets: bool
+ exec_mode: int
+ parameters: dict
+ stats_mode: int
+
+ def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
+ tx_control = self.tx_control.to_proto() if self.tx_control is not None else self.tx_control
+ return ydb_query_pb2.ExecuteQueryRequest(
+ session_id=self.session_id,
+ tx_control=tx_control,
+ query_content=self.query_content.to_proto(),
+ exec_mode=self.exec_mode,
+ stats_mode=self.stats_mode,
+ concurrent_result_sets=self.concurrent_result_sets,
+ parameters=convert.query_parameters_to_pb(self.parameters),
+ )
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
new file mode 100644
index 0000000000..3ef2d55430
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
@@ -0,0 +1,65 @@
+import abc
+import typing
+
+from .common_utils import IToProto
+
+try:
+ from ydb.public.api.protos import ydb_query_pb2
+except ImportError:
+ from contrib.ydb.public.api.protos import ydb_query_pb2
+
+
+class BaseQueryTxMode(IToProto):
+ @property
+ @abc.abstractmethod
+ def name(self) -> str:
+ pass
+
+
+class QuerySnapshotReadOnly(BaseQueryTxMode):
+ def __init__(self):
+ self._name = "snapshot_read_only"
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def to_proto(self) -> ydb_query_pb2.SnapshotModeSettings:
+ return ydb_query_pb2.SnapshotModeSettings()
+
+
+class QuerySerializableReadWrite(BaseQueryTxMode):
+ def __init__(self):
+ self._name = "serializable_read_write"
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def to_proto(self) -> ydb_query_pb2.SerializableModeSettings:
+ return ydb_query_pb2.SerializableModeSettings()
+
+
+class QueryOnlineReadOnly(BaseQueryTxMode):
+ def __init__(self, allow_inconsistent_reads: bool = False):
+ self.allow_inconsistent_reads = allow_inconsistent_reads
+ self._name = "online_read_only"
+
+ @property
+ def name(self):
+ return self._name
+
+ def to_proto(self) -> ydb_query_pb2.OnlineModeSettings:
+ return ydb_query_pb2.OnlineModeSettings(allow_inconsistent_reads=self.allow_inconsistent_reads)
+
+
+class QueryStaleReadOnly(BaseQueryTxMode):
+ def __init__(self):
+ self._name = "stale_read_only"
+
+ @property
+ def name(self):
+ return self._name
+
+ def to_proto(self) -> ydb_query_pb2.StaleModeSettings:
+ return ydb_query_pb2.StaleModeSettings()
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
index 17fb288555..b907ee2794 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
@@ -10,7 +10,7 @@ from typing import (
Callable,
)
-from ..table import RetrySettings
+from ..retries import RetrySettings
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
index 007c8a54b5..585e88abd7 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -26,9 +26,9 @@ from .topic_writer import (
from .. import (
_apis,
issues,
- check_retriable_error,
- RetrySettings,
)
+from .._errors import check_retriable_error
+from ..retries import RetrySettings
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
from .._grpc.grpcwrapper.ydb_topic import (
UpdateTokenRequest,
diff --git a/contrib/python/ydb/py3/ydb/convert.py b/contrib/python/ydb/py3/ydb/convert.py
index 6c4164bc44..63a5dbe4e3 100644
--- a/contrib/python/ydb/py3/ydb/convert.py
+++ b/contrib/python/ydb/py3/ydb/convert.py
@@ -281,6 +281,66 @@ def parameters_to_pb(parameters_types, parameters_values):
return param_values_pb
+def query_parameters_to_pb(parameters):
+ if parameters is None or not parameters:
+ return {}
+
+ parameters_types = {}
+ parameters_values = {}
+ for name, value in parameters.items():
+ if isinstance(value, types.TypedValue):
+ if value.value_type is None:
+ value.value_type = _type_from_python_native(value.value)
+ elif isinstance(value, tuple):
+ value = types.TypedValue(*value)
+ else:
+ value = types.TypedValue(value, _type_from_python_native(value))
+
+ parameters_values[name] = value.value
+ parameters_types[name] = value.value_type
+
+ return parameters_to_pb(parameters_types, parameters_values)
+
+
+_from_python_type_map = {
+ int: types.PrimitiveType.Int64,
+ float: types.PrimitiveType.Float,
+ bool: types.PrimitiveType.Bool,
+ str: types.PrimitiveType.Utf8,
+}
+
+
+def _type_from_python_native(value):
+ t = type(value)
+
+ if t in _from_python_type_map:
+ return _from_python_type_map[t]
+
+ if t == list:
+ if len(value) == 0:
+ raise ValueError(
+ "Could not map empty list to any type, please specify "
+ "it manually by tuple(value, type) or ydb.TypedValue"
+ )
+ entry_type = _type_from_python_native(value[0])
+ return types.ListType(entry_type)
+
+ if t == dict:
+ if len(value) == 0:
+ raise ValueError(
+ "Could not map empty dict to any type, please specify "
+ "it manually by tuple(value, type) or ydb.TypedValue"
+ )
+ entry = list(value.items())[0]
+ key_type = _type_from_python_native(entry[0])
+ value_type = _type_from_python_native(entry[1])
+ return types.DictType(key_type, value_type)
+
+ raise ValueError(
+ "Could not map value to any type, please specify it manually by tuple(value, type) or ydb.TypedValue"
+ )
+
+
def _unwrap_optionality(column):
c_type = column.type
current_type = c_type.WhichOneof("type")
diff --git a/contrib/python/ydb/py3/ydb/draft/_apis.py b/contrib/python/ydb/py3/ydb/draft/_apis.py
new file mode 100644
index 0000000000..8c9b56a065
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/draft/_apis.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+import typing
+
+try:
+ from ydb.public.api.grpc.draft import (
+ ydb_dynamic_config_v1_pb2_grpc,
+ )
+
+ from ydb.public.api.protos.draft import (
+ ydb_dynamic_config_pb2,
+ )
+except ImportError:
+ from contrib.ydb.public.api.grpc.draft import (
+ ydb_dynamic_config_v1_pb2_grpc,
+ )
+
+ from contrib.ydb.public.api.protos.draft import (
+ ydb_dynamic_config_pb2,
+ )
+
+
+ydb_dynamic_config = ydb_dynamic_config_pb2
+
+
+class DynamicConfigService(object):
+ Stub = ydb_dynamic_config_v1_pb2_grpc.DynamicConfigServiceStub
+
+ ReplaceConfig = "ReplaceConfig"
+ SetConfig = "SetConfig"
+ GetConfig = "GetConfig"
+ GetNodeLabels = "GetNodeLabels"
diff --git a/contrib/python/ydb/py3/ydb/draft/dynamic_config.py b/contrib/python/ydb/py3/ydb/draft/dynamic_config.py
index afec19eca7..4648b29a77 100644
--- a/contrib/python/ydb/py3/ydb/draft/dynamic_config.py
+++ b/contrib/python/ydb/py3/ydb/draft/dynamic_config.py
@@ -1,6 +1,7 @@
import abc
from abc import abstractmethod
-from .. import issues, operation, _apis
+from . import _apis
+from .. import issues, operation
class IDynamicConfigClient(abc.ABC):
diff --git a/contrib/python/ydb/py3/ydb/driver.py b/contrib/python/ydb/py3/ydb/driver.py
index 89109b9b57..ecd3319e87 100644
--- a/contrib/python/ydb/py3/ydb/driver.py
+++ b/contrib/python/ydb/py3/ydb/driver.py
@@ -54,6 +54,13 @@ def credentials_from_env_variables(tracer=None):
ctx.trace({"credentials.access_token": True})
return credentials_impl.AuthTokenCredentials(access_token)
+ oauth2_key_file = os.getenv("YDB_OAUTH2_KEY_FILE")
+ if oauth2_key_file:
+ ctx.trace({"credentials.oauth2_key_file": True})
+ import ydb.oauth2_token_exchange
+
+ return ydb.oauth2_token_exchange.Oauth2TokenExchangeCredentials.from_file(oauth2_key_file)
+
ctx.trace(
{
"credentials.env_default": True,
diff --git a/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py b/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py
index e4d1db9498..819f719b6a 100644
--- a/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py
+++ b/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py
@@ -2,6 +2,8 @@
import typing
import json
import abc
+import os
+import base64
try:
import requests
@@ -9,7 +11,24 @@ except ImportError:
requests = None
from ydb import credentials, tracing, issues
-from .token_source import TokenSource
+from .token_source import TokenSource, FixedTokenSource, JwtTokenSource
+
+
+# method -> is HMAC
+_supported_uppercase_jwt_algs = {
+ "HS256": True,
+ "HS384": True,
+ "HS512": True,
+ "RS256": False,
+ "RS384": False,
+ "RS512": False,
+ "PS256": False,
+ "PS384": False,
+ "PS512": False,
+ "ES256": False,
+ "ES384": False,
+ "ES512": False,
+}
class Oauth2TokenExchangeCredentialsBase(abc.ABC):
@@ -20,7 +39,7 @@ class Oauth2TokenExchangeCredentialsBase(abc.ABC):
actor_token_source: typing.Optional[TokenSource] = None,
audience: typing.Union[typing.List[str], str, None] = None,
scope: typing.Union[typing.List[str], str, None] = None,
- resource: typing.Optional[str] = None,
+ resource: typing.Union[typing.List[str], str, None] = None,
grant_type: str = "urn:ietf:params:oauth:grant-type:token-exchange",
requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token",
):
@@ -94,6 +113,193 @@ class Oauth2TokenExchangeCredentialsBase(abc.ABC):
return params
+ @classmethod
+ def _jwt_token_source_from_config(cls, cfg_json):
+ signing_method = cls._required_string_from_config(cfg_json, "alg")
+ is_hmac = _supported_uppercase_jwt_algs.get(signing_method.upper(), None)
+ if is_hmac is not None: # we know this method => do uppercase
+ signing_method = signing_method.upper()
+ private_key = cls._required_string_from_config(cfg_json, "private-key")
+ if is_hmac: # decode from base64
+ private_key = base64.b64decode(private_key + "===") # to allow unpadded strings
+ return JwtTokenSource(
+ signing_method=signing_method,
+ private_key=private_key,
+ key_id=cls._string_with_default_from_config(cfg_json, "kid", None),
+ issuer=cls._string_with_default_from_config(cfg_json, "iss", None),
+ subject=cls._string_with_default_from_config(cfg_json, "sub", None),
+ audience=cls._list_of_strings_or_single_from_config(cfg_json, "aud"),
+ id=cls._string_with_default_from_config(cfg_json, "jti", None),
+ token_ttl_seconds=cls._duration_seconds_from_config(cfg_json, "ttl", 3600),
+ )
+
+ @classmethod
+ def _fixed_token_source_from_config(cls, cfg_json):
+ return FixedTokenSource(
+ cls._required_string_from_config(cfg_json, "token"),
+ cls._required_string_from_config(cfg_json, "token-type"),
+ )
+
+ @classmethod
+ def _token_source_from_config(cls, cfg_json, key_name):
+ value = cfg_json.get(key_name, None)
+ if value is None:
+ return None
+ if not isinstance(value, dict):
+ raise Exception('Key "{}" is expected to be a json map'.format(key_name))
+
+ source_type = cls._required_string_from_config(value, "type")
+ if source_type.upper() == "FIXED":
+ return cls._fixed_token_source_from_config(value)
+ if source_type.upper() == "JWT":
+ return cls._jwt_token_source_from_config(value)
+ raise Exception('"{}": unknown token source type: "{}"'.format(key_name, source_type))
+
+ @classmethod
+ def _list_of_strings_or_single_from_config(cls, cfg_json, key_name):
+ value = cfg_json.get(key_name, None)
+ if value is None:
+ return None
+ if isinstance(value, list):
+ for val in value:
+ if not isinstance(val, str) or not val:
+ raise Exception(
+ 'Key "{}" is expected to be a single string or list of nonempty strings'.format(key_name)
+ )
+ return value
+ else:
+ if isinstance(value, str):
+ return value
+ raise Exception('Key "{}" is expected to be a single string or list of nonempty strings'.format(key_name))
+
+ @classmethod
+ def _required_string_from_config(cls, cfg_json, key_name):
+ value = cfg_json.get(key_name, None)
+ if value is None or not isinstance(value, str) or not value:
+ raise Exception('Key "{}" is expected to be a nonempty string'.format(key_name))
+ return value
+
+ @classmethod
+ def _string_with_default_from_config(cls, cfg_json, key_name, default_value):
+ value = cfg_json.get(key_name, None)
+ if value is None:
+ return default_value
+ if not isinstance(value, str):
+ raise Exception('Key "{}" is expected to be a string'.format(key_name))
+ return value
+
+ @classmethod
+ def _duration_seconds_from_config(cls, cfg_json, key_name, default_value):
+ value = cfg_json.get(key_name, None)
+ if value is None:
+ return default_value
+ if not isinstance(value, str):
+ raise Exception('Key "{}" is expected to be a string'.format(key_name))
+ multiplier = 1
+ if value.endswith("s"):
+ multiplier = 1
+ value = value[:-1]
+ elif value.endswith("m"):
+ multiplier = 60
+ value = value[:-1]
+ elif value.endswith("h"):
+ multiplier = 3600
+ value = value[:-1]
+ elif value.endswith("d"):
+ multiplier = 3600 * 24
+ value = value[:-1]
+ elif value.endswith("ms"):
+ multiplier = 1.0 / 1000
+ value = value[:-2]
+ elif value.endswith("us"):
+ multiplier = 1.0 / 1000000
+ value = value[:-2]
+ elif value.endswith("ns"):
+ multiplier = 1.0 / 1000000000
+ value = value[:-2]
+ f = float(value)
+ if f < 0.0:
+ raise Exception("{}: negative duration is not allowed".format(value))
+ return int(f * multiplier)
+
+ @classmethod
+ def from_file(cls, cfg_file, iam_endpoint=None):
+ """
+ Create OAuth 2.0 token exchange protocol credentials from config file.
+
+ https://www.rfc-editor.org/rfc/rfc8693
+ Config file must be a valid json file
+
+ Fields of json file
+ grant-type: [string] Grant type option (default: "urn:ietf:params:oauth:grant-type:token-exchange")
+ res: [string | list of strings] Resource option (optional)
+ aud: [string | list of strings] Audience option for token exchange request (optional)
+ scope: [string | list of strings] Scope option (optional)
+ requested-token-type: [string] Requested token type option (default: "urn:ietf:params:oauth:token-type:access_token")
+ subject-credentials: [creds_json] Subject credentials options (optional)
+ actor-credentials: [creds_json] Actor credentials options (optional)
+ token-endpoint: [string] Token endpoint
+
+ Fields of creds_json (JWT):
+ type: [string] Token source type. Set JWT
+ alg: [string] Algorithm for JWT signature.
+ Supported algorithms can be listed
+ with GetSupportedOauth2TokenExchangeJwtAlgorithms()
+ private-key: [string] (Private) key in PEM format (RSA, EC) or Base64 format (HMAC) for JWT signature
+ kid: [string] Key id JWT standard claim (optional)
+ iss: [string] Issuer JWT standard claim (optional)
+ sub: [string] Subject JWT standard claim (optional)
+ aud: [string | list of strings] Audience JWT standard claim (optional)
+ jti: [string] JWT ID JWT standard claim (optional)
+ ttl: [string] Token TTL (default: 1h)
+
+ Fields of creds_json (FIXED):
+ type: [string] Token source type. Set FIXED
+ token: [string] Token value
+ token-type: [string] Token type value. It will become
+ subject_token_type/actor_token_type parameter
+ in token exchange request (https://www.rfc-editor.org/rfc/rfc8693)
+ """
+ with open(os.path.expanduser(cfg_file), "r") as r:
+ cfg = r.read()
+
+ return cls.from_content(cfg, iam_endpoint=iam_endpoint)
+
+ @classmethod
+ def from_content(cls, cfg, iam_endpoint=None):
+ try:
+ cfg_json = json.loads(cfg)
+ except Exception as ex:
+ raise Exception("Failed to parse json config: {}".format(ex))
+
+ if iam_endpoint is not None:
+ token_endpoint = iam_endpoint
+ else:
+ token_endpoint = cfg_json.get("token-endpoint", "")
+
+ subject_token_source = cls._token_source_from_config(cfg_json, "subject-credentials")
+ actor_token_source = cls._token_source_from_config(cfg_json, "actor-credentials")
+ audience = cls._list_of_strings_or_single_from_config(cfg_json, "aud")
+ scope = cls._list_of_strings_or_single_from_config(cfg_json, "scope")
+ resource = cls._list_of_strings_or_single_from_config(cfg_json, "res")
+ grant_type = cls._string_with_default_from_config(
+ cfg_json, "grant-type", "urn:ietf:params:oauth:grant-type:token-exchange"
+ )
+ requested_token_type = cls._string_with_default_from_config(
+ cfg_json, "requested-token-type", "urn:ietf:params:oauth:token-type:access_token"
+ )
+
+ return cls(
+ token_endpoint=token_endpoint,
+ subject_token_source=subject_token_source,
+ actor_token_source=actor_token_source,
+ audience=audience,
+ scope=scope,
+ resource=resource,
+ grant_type=grant_type,
+ requested_token_type=requested_token_type,
+ )
+
class Oauth2TokenExchangeCredentials(credentials.AbstractExpiringTokenCredentials, Oauth2TokenExchangeCredentialsBase):
def __init__(
@@ -103,7 +309,7 @@ class Oauth2TokenExchangeCredentials(credentials.AbstractExpiringTokenCredential
actor_token_source: typing.Optional[TokenSource] = None,
audience: typing.Union[typing.List[str], str, None] = None,
scope: typing.Union[typing.List[str], str, None] = None,
- resource: typing.Optional[str] = None,
+ resource: typing.Union[typing.List[str], str, None] = None,
grant_type: str = "urn:ietf:params:oauth:grant-type:token-exchange",
requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token",
tracer=None,
diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py
new file mode 100644
index 0000000000..eb967abc20
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/query/__init__.py
@@ -0,0 +1,40 @@
+__all__ = [
+ "QueryOnlineReadOnly",
+ "QuerySerializableReadWrite",
+ "QuerySnapshotReadOnly",
+ "QueryStaleReadOnly",
+ "QuerySessionPool",
+ "QueryClientSync",
+ "QuerySessionSync",
+]
+
+import logging
+
+from .base import (
+ IQueryClient,
+ SupportedDriverType,
+ QueryClientSettings,
+)
+
+from .session import QuerySessionSync
+
+from .._grpc.grpcwrapper.ydb_query_public_types import (
+ QueryOnlineReadOnly,
+ QuerySerializableReadWrite,
+ QuerySnapshotReadOnly,
+ QueryStaleReadOnly,
+)
+
+from .pool import QuerySessionPool
+
+logger = logging.getLogger(__name__)
+
+
+class QueryClientSync(IQueryClient):
+ def __init__(self, driver: SupportedDriverType, query_client_settings: QueryClientSettings = None):
+ logger.warning("QueryClientSync is an experimental API, which could be changed.")
+ self._driver = driver
+ self._settings = query_client_settings
+
+ def session(self) -> QuerySessionSync:
+ return QuerySessionSync(self._driver, self._settings)
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
new file mode 100644
index 0000000000..e08d9f52d0
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -0,0 +1,388 @@
+import abc
+import enum
+import functools
+
+from typing import (
+ Iterator,
+ Optional,
+)
+
+from .._grpc.grpcwrapper.common_utils import (
+ SupportedDriverType,
+)
+from .._grpc.grpcwrapper import ydb_query
+from .._grpc.grpcwrapper.ydb_query_public_types import (
+ BaseQueryTxMode,
+)
+from ..connection import _RpcState as RpcState
+from .. import convert
+from .. import issues
+from .. import _utilities
+from .. import _apis
+
+
+class QuerySyntax(enum.IntEnum):
+ UNSPECIFIED = 0
+ YQL_V1 = 1
+ PG = 2
+
+
+class QueryExecMode(enum.IntEnum):
+ UNSPECIFIED = 0
+ PARSE = 10
+ VALIDATE = 20
+ EXPLAIN = 30
+ EXECUTE = 50
+
+
+class StatsMode(enum.IntEnum):
+ UNSPECIFIED = 0
+ NONE = 10
+ BASIC = 20
+ FULL = 30
+ PROFILE = 40
+
+
+class SyncResponseContextIterator(_utilities.SyncResponseIterator):
+ def __enter__(self) -> "SyncResponseContextIterator":
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for _ in self:
+ pass
+
+
+class QueryClientSettings:
+ pass
+
+
+class IQuerySessionState(abc.ABC):
+ def __init__(self, settings: Optional[QueryClientSettings] = None):
+ pass
+
+ @abc.abstractmethod
+ def reset(self) -> None:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def session_id(self) -> Optional[str]:
+ pass
+
+ @abc.abstractmethod
+ def set_session_id(self, session_id: str) -> "IQuerySessionState":
+ pass
+
+ @property
+ @abc.abstractmethod
+ def node_id(self) -> Optional[int]:
+ pass
+
+ @abc.abstractmethod
+ def set_node_id(self, node_id: int) -> "IQuerySessionState":
+ pass
+
+ @property
+ @abc.abstractmethod
+ def attached(self) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def set_attached(self, attached: bool) -> "IQuerySessionState":
+ pass
+
+
+class IQuerySession(abc.ABC):
+ """Session object for Query Service. It is not recommended to control
+ session's lifecycle manually - use a QuerySessionPool is always a better choise.
+ """
+
+ @abc.abstractmethod
+ def __init__(self, driver: SupportedDriverType, settings: Optional[QueryClientSettings] = None):
+ pass
+
+ @abc.abstractmethod
+ def create(self) -> "IQuerySession":
+ """WARNING: This API is experimental and could be changed.
+
+ Creates a Session of Query Service on server side and attaches it.
+
+ :return: Session object.
+ """
+ pass
+
+ @abc.abstractmethod
+ def delete(self) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Deletes a Session of Query Service on server side and releases resources.
+
+ :return: None
+ """
+ pass
+
+ @abc.abstractmethod
+ def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxContext":
+ """WARNING: This API is experimental and could be changed.
+
+ Creates a transaction context manager with specified transaction mode.
+
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+
+ :return: transaction context manager.
+ """
+ pass
+
+ @abc.abstractmethod
+ def execute(
+ self,
+ query: str,
+ syntax: Optional[QuerySyntax] = None,
+ exec_mode: Optional[QueryExecMode] = None,
+ parameters: Optional[dict] = None,
+ concurrent_result_sets: Optional[bool] = False,
+ ) -> Iterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param parameters: dict with parameters and YDB types;
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+
+
+class IQueryTxContext(abc.ABC):
+ """
+ An object that provides a simple transaction context manager that allows statements execution
+ in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
+ transaction control logic, and opens new transaction if:
+ 1) By explicit .begin();
+ 2) On execution of a first statement, which is strictly recommended method, because that avoids
+ useless round trip
+
+ This context manager is not thread-safe, so you should not manipulate on it concurrently.
+ """
+
+ @abc.abstractmethod
+ def __init__(
+ self,
+ driver: SupportedDriverType,
+ session_state: IQuerySessionState,
+ session: IQuerySession,
+ tx_mode: BaseQueryTxMode,
+ ):
+ """
+ An object that provides a simple transaction context manager that allows statements execution
+ in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
+ transaction control logic, and opens new transaction if:
+
+ 1) By explicit .begin() method;
+ 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip
+
+ This context manager is not thread-safe, so you should not manipulate on it concurrently.
+
+ :param driver: A driver instance
+ :param session_state: A state of session
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+ """
+ pass
+
+ @abc.abstractmethod
+ def __enter__(self) -> "IQueryTxContext":
+ """
+ Enters a context manager and returns a transaction
+
+ :return: A transaction instance
+ """
+ pass
+
+ @abc.abstractmethod
+ def __exit__(self, *args, **kwargs):
+ """
+ Closes a transaction context manager and rollbacks transaction if
+ it is not finished explicitly
+ """
+ pass
+
+ @property
+ @abc.abstractmethod
+ def session_id(self) -> str:
+ """
+ A transaction's session id
+
+ :return: A transaction's session id
+ """
+ pass
+
+ @property
+ @abc.abstractmethod
+ def tx_id(self) -> Optional[str]:
+ """
+ Returns an id of open transaction or None otherwise
+
+ :return: An id of open transaction or None otherwise
+ """
+ pass
+
+ @abc.abstractmethod
+ def begin(self, settings: Optional[QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Explicitly begins a transaction
+
+ :param settings: A request settings
+
+ :return: None or exception if begin is failed
+ """
+ pass
+
+ @abc.abstractmethod
+ def commit(self, settings: Optional[QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls commit on a transaction if it is open. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: None or exception if commit is failed
+ """
+ pass
+
+ @abc.abstractmethod
+ def rollback(self, settings: Optional[QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls rollback on a transaction if it is open. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: None or exception if rollback is failed
+ """
+ pass
+
+ @abc.abstractmethod
+ def execute(
+ self,
+ query: str,
+ commit_tx: Optional[bool] = False,
+ syntax: Optional[QuerySyntax] = None,
+ exec_mode: Optional[QueryExecMode] = None,
+ parameters: Optional[dict] = None,
+ concurrent_result_sets: Optional[bool] = False,
+ ) -> Iterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param commit_tx: A special flag that allows transaction commit.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param exec_mode: Exec mode of the query, which is a one from the following choises:
+ 1) QueryExecMode.EXECUTE, which is default;
+ 2) QueryExecMode.EXPLAIN;
+ 3) QueryExecMode.VALIDATE;
+ 4) QueryExecMode.PARSE.
+ :param parameters: dict with parameters and YDB types;
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+ pass
+
+
+class IQueryClient(abc.ABC):
+ def __init__(self, driver: SupportedDriverType, query_client_settings: Optional[QueryClientSettings] = None):
+ pass
+
+ @abc.abstractmethod
+ def session(self) -> IQuerySession:
+ pass
+
+
+def create_execute_query_request(
+ query: str,
+ session_id: str,
+ tx_id: Optional[str],
+ commit_tx: Optional[bool],
+ tx_mode: Optional[BaseQueryTxMode],
+ syntax: Optional[QuerySyntax],
+ exec_mode: Optional[QueryExecMode],
+ parameters: Optional[dict],
+ concurrent_result_sets: Optional[bool],
+) -> ydb_query.ExecuteQueryRequest:
+ syntax = QuerySyntax.YQL_V1 if not syntax else syntax
+ exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
+ stats_mode = StatsMode.NONE # TODO: choise is not supported yet
+
+ tx_control = None
+ if not tx_id and not tx_mode:
+ tx_control = None
+ elif tx_id:
+ tx_control = ydb_query.TransactionControl(
+ tx_id=tx_id,
+ commit_tx=commit_tx,
+ begin_tx=None,
+ )
+ else:
+ tx_control = ydb_query.TransactionControl(
+ begin_tx=ydb_query.TransactionSettings(
+ tx_mode=tx_mode,
+ ),
+ commit_tx=commit_tx,
+ tx_id=None,
+ )
+
+ return ydb_query.ExecuteQueryRequest(
+ session_id=session_id,
+ query_content=ydb_query.QueryContent.from_public(
+ query=query,
+ syntax=syntax,
+ ),
+ tx_control=tx_control,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ stats_mode=stats_mode,
+ )
+
+
+def wrap_execute_query_response(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
+ tx: Optional[IQueryTxContext] = None,
+ commit_tx: Optional[bool] = False,
+) -> convert.ResultSet:
+ issues._process_response(response_pb)
+ if tx and response_pb.tx_meta and not tx.tx_id:
+ tx._move_to_beginned(response_pb.tx_meta.id)
+ if tx and commit_tx:
+ tx._move_to_commited()
+ return convert.ResultSet.from_message(response_pb.result_set)
+
+
+def bad_session_handler(func):
+ @functools.wraps(func)
+ def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs):
+ try:
+ return func(rpc_state, response_pb, session_state, *args, **kwargs)
+ except issues.BadSession:
+ session_state.reset()
+ raise
+
+ return decorator
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
new file mode 100644
index 0000000000..e7514cdf76
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -0,0 +1,91 @@
+import logging
+from typing import (
+ Callable,
+ Optional,
+ List,
+)
+
+from . import base
+from .session import (
+ QuerySessionSync,
+)
+from ..retries import (
+ RetrySettings,
+ retry_operation_sync,
+)
+from .. import convert
+
+logger = logging.getLogger(__name__)
+
+
+class QuerySessionPool:
+ """QuerySessionPool is an object to simplify operations with sessions of Query Service."""
+
+ def __init__(self, driver: base.SupportedDriverType):
+ """
+ :param driver: A driver instance
+ """
+
+ logger.warning("QuerySessionPool is an experimental API, which could be changed.")
+ self._driver = driver
+
+ def checkout(self) -> "SimpleQuerySessionCheckout":
+ """WARNING: This API is experimental and could be changed.
+ Return a Session context manager, that opens session on enter and closes session on exit.
+ """
+
+ return SimpleQuerySessionCheckout(self)
+
+ def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs):
+ """WARNING: This API is experimental and could be changed.
+ Special interface to execute a bunch of commands with session in a safe, retriable way.
+
+ :param callee: A function, that works with session.
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ def wrapped_callee():
+ with self.checkout() as session:
+ return callee(session, *args, **kwargs)
+
+ return retry_operation_sync(wrapped_callee, retry_settings)
+
+ def execute_with_retries(
+ self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
+ ) -> List[convert.ResultSet]:
+ """WARNING: This API is experimental and could be changed.
+ Special interface to execute a one-shot queries in a safe, retriable way.
+ Note: this method loads all data from stream before return, do not use this
+ method with huge read queries.
+
+ :param query: A query, yql or sql text.
+ :param retry_settings: RetrySettings object.
+
+ :return: Result sets or exception in case of execution errors.
+ """
+
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+
+ def wrapped_callee():
+ with self.checkout() as session:
+ it = session.execute(query, *args, **kwargs)
+ return [result_set for result_set in it]
+
+ return retry_operation_sync(wrapped_callee, retry_settings)
+
+
+class SimpleQuerySessionCheckout:
+ def __init__(self, pool: QuerySessionPool):
+ self._pool = pool
+ self._session = QuerySessionSync(pool._driver)
+
+ def __enter__(self) -> base.IQuerySession:
+ self._session.create()
+ return self._session
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._session.delete()
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
new file mode 100644
index 0000000000..d6034d348a
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -0,0 +1,317 @@
+import abc
+import enum
+import logging
+import threading
+from typing import (
+ Iterable,
+ Optional,
+)
+
+from . import base
+
+from .. import _apis, issues, _utilities
+from ..connection import _RpcState as RpcState
+from .._grpc.grpcwrapper import common_utils
+from .._grpc.grpcwrapper import ydb_query as _ydb_query
+from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
+
+from .transaction import BaseQueryTxContext
+
+
+logger = logging.getLogger(__name__)
+
+
+class QuerySessionStateEnum(enum.Enum):
+ NOT_INITIALIZED = "NOT_INITIALIZED"
+ CREATED = "CREATED"
+ CLOSED = "CLOSED"
+
+
+class QuerySessionStateHelper(abc.ABC):
+ _VALID_TRANSITIONS = {
+ QuerySessionStateEnum.NOT_INITIALIZED: [QuerySessionStateEnum.CREATED],
+ QuerySessionStateEnum.CREATED: [QuerySessionStateEnum.CLOSED],
+ QuerySessionStateEnum.CLOSED: [],
+ }
+
+ _READY_TO_USE = [
+ QuerySessionStateEnum.CREATED,
+ ]
+
+ @classmethod
+ def valid_transition(cls, before: QuerySessionStateEnum, after: QuerySessionStateEnum) -> bool:
+ return after in cls._VALID_TRANSITIONS[before]
+
+ @classmethod
+ def ready_to_use(cls, state: QuerySessionStateEnum) -> bool:
+ return state in cls._READY_TO_USE
+
+
+class QuerySessionState(base.IQuerySessionState):
+ _session_id: Optional[str] = None
+ _node_id: Optional[int] = None
+ _attached: bool = False
+ _settings: Optional[base.QueryClientSettings] = None
+ _state: QuerySessionStateEnum = QuerySessionStateEnum.NOT_INITIALIZED
+
+ def __init__(self, settings: base.QueryClientSettings = None):
+ self._settings = settings
+
+ def reset(self) -> None:
+ self._session_id = None
+ self._node_id = None
+ self._attached = False
+
+ @property
+ def session_id(self) -> Optional[str]:
+ return self._session_id
+
+ def set_session_id(self, session_id: str) -> "QuerySessionState":
+ self._session_id = session_id
+ return self
+
+ @property
+ def node_id(self) -> Optional[int]:
+ return self._node_id
+
+ def set_node_id(self, node_id: int) -> "QuerySessionState":
+ self._node_id = node_id
+ return self
+
+ @property
+ def attached(self) -> bool:
+ return self._attached
+
+ def set_attached(self, attached: bool) -> "QuerySessionState":
+ self._attached = attached
+
+ def _check_invalid_transition(self, target: QuerySessionStateEnum) -> None:
+ if not QuerySessionStateHelper.valid_transition(self._state, target):
+ raise RuntimeError(f"Session could not be moved from {self._state.value} to {target.value}")
+
+ def _change_state(self, target: QuerySessionStateEnum) -> None:
+ self._check_invalid_transition(target)
+ self._state = target
+
+ def _check_session_ready_to_use(self) -> None:
+ if not QuerySessionStateHelper.ready_to_use(self._state):
+ raise RuntimeError(f"Session is not ready to use, current state: {self._state.value}")
+
+ def _already_in(self, target) -> bool:
+ return self._state == target
+
+
+def wrapper_create_session(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.CreateSessionResponse,
+ session_state: QuerySessionState,
+ session: "BaseQuerySession",
+) -> "BaseQuerySession":
+ message = _ydb_query.CreateSessionResponse.from_proto(response_pb)
+ issues._process_response(message.status)
+ session_state.set_session_id(message.session_id).set_node_id(message.node_id)
+ return session
+
+
+def wrapper_delete_session(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.DeleteSessionResponse,
+ session_state: QuerySessionState,
+ session: "BaseQuerySession",
+) -> "BaseQuerySession":
+ message = _ydb_query.DeleteSessionResponse.from_proto(response_pb)
+ issues._process_response(message.status)
+ session_state.reset()
+ session_state._change_state(QuerySessionStateEnum.CLOSED)
+ return session
+
+
+class BaseQuerySession(base.IQuerySession):
+ _driver: base.SupportedDriverType
+ _settings: base.QueryClientSettings
+ _state: QuerySessionState
+
+ def __init__(self, driver: base.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
+ self._driver = driver
+ self._settings = settings if settings is not None else base.QueryClientSettings()
+ self._state = QuerySessionState(settings)
+
+ def _create_call(self) -> "BaseQuerySession":
+ return self._driver(
+ _apis.ydb_query.CreateSessionRequest(),
+ _apis.QueryService.Stub,
+ _apis.QueryService.CreateSession,
+ wrap_result=wrapper_create_session,
+ wrap_args=(self._state, self),
+ )
+
+ def _delete_call(self) -> "BaseQuerySession":
+ return self._driver(
+ _apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id),
+ _apis.QueryService.Stub,
+ _apis.QueryService.DeleteSession,
+ wrap_result=wrapper_delete_session,
+ wrap_args=(self._state, self),
+ )
+
+ def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
+ return self._driver(
+ _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
+ _apis.QueryService.Stub,
+ _apis.QueryService.AttachSession,
+ )
+
+ def _execute_call(
+ self,
+ query: str,
+ commit_tx: bool = False,
+ syntax: base.QuerySyntax = None,
+ exec_mode: base.QueryExecMode = None,
+ parameters: dict = None,
+ concurrent_result_sets: bool = False,
+ ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
+ request = base.create_execute_query_request(
+ query=query,
+ session_id=self._state.session_id,
+ commit_tx=commit_tx,
+ tx_mode=None,
+ tx_id=None,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+
+ return self._driver(
+ request.to_proto(),
+ _apis.QueryService.Stub,
+ _apis.QueryService.ExecuteQuery,
+ )
+
+
+class QuerySessionSync(BaseQuerySession):
+ """Session object for Query Service. It is not recommended to control
+ session's lifecycle manually - use a QuerySessionPool is always a better choise.
+ """
+
+ _stream = None
+
+ def _attach(self) -> None:
+ self._stream = self._attach_call()
+ status_stream = _utilities.SyncResponseIterator(
+ self._stream,
+ lambda response: common_utils.ServerStatus.from_proto(response),
+ )
+
+ first_response = next(status_stream)
+ if first_response.status != issues.StatusCode.SUCCESS:
+ pass
+
+ self._state.set_attached(True)
+ self._state._change_state(QuerySessionStateEnum.CREATED)
+
+ threading.Thread(
+ target=self._check_session_status_loop,
+ args=(status_stream,),
+ name="check session status thread",
+ daemon=True,
+ ).start()
+
+ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseIterator) -> None:
+ try:
+ for status in status_stream:
+ if status.status != issues.StatusCode.SUCCESS:
+ self._state.reset()
+ self._state._change_state(QuerySessionStateEnum.CLOSED)
+ except Exception:
+ pass
+
+ def delete(self) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Deletes a Session of Query Service on server side and releases resources.
+
+ :return: None
+ """
+ if self._state._already_in(QuerySessionStateEnum.CLOSED):
+ return
+
+ self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
+ self._delete_call()
+ self._stream.cancel()
+
+ def create(self) -> "QuerySessionSync":
+ """WARNING: This API is experimental and could be changed.
+
+ Creates a Session of Query Service on server side and attaches it.
+
+ :return: QuerySessionSync object.
+ """
+ if self._state._already_in(QuerySessionStateEnum.CREATED):
+ return
+
+ self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
+ self._create_call()
+ self._attach()
+
+ return self
+
+ def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQueryTxContext:
+ """WARNING: This API is experimental and could be changed.
+
+ Creates a transaction context manager with specified transaction mode.
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+
+ :return transaction context manager.
+
+ """
+ self._state._check_session_ready_to_use()
+
+ tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
+
+ return BaseQueryTxContext(
+ self._driver,
+ self._state,
+ self,
+ tx_mode,
+ )
+
+ def execute(
+ self,
+ query: str,
+ syntax: base.QuerySyntax = None,
+ exec_mode: base.QueryExecMode = None,
+ parameters: dict = None,
+ concurrent_result_sets: bool = False,
+ ) -> base.SyncResponseContextIterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param parameters: dict with parameters and YDB types;
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+ self._state._check_session_ready_to_use()
+
+ stream_it = self._execute_call(
+ query=query,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+
+ return base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp),
+ )
diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py
new file mode 100644
index 0000000000..0a49320293
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/query/transaction.py
@@ -0,0 +1,412 @@
+import abc
+import logging
+import enum
+import functools
+from typing import (
+ Iterable,
+ Optional,
+)
+
+from .. import (
+ _apis,
+ issues,
+)
+from .._grpc.grpcwrapper import ydb_query as _ydb_query
+from ..connection import _RpcState as RpcState
+
+from . import base
+
+logger = logging.getLogger(__name__)
+
+
+class QueryTxStateEnum(enum.Enum):
+ NOT_INITIALIZED = "NOT_INITIALIZED"
+ BEGINED = "BEGINED"
+ COMMITTED = "COMMITTED"
+ ROLLBACKED = "ROLLBACKED"
+ DEAD = "DEAD"
+
+
+class QueryTxStateHelper(abc.ABC):
+ _VALID_TRANSITIONS = {
+ QueryTxStateEnum.NOT_INITIALIZED: [
+ QueryTxStateEnum.BEGINED,
+ QueryTxStateEnum.DEAD,
+ QueryTxStateEnum.COMMITTED,
+ QueryTxStateEnum.ROLLBACKED,
+ ],
+ QueryTxStateEnum.BEGINED: [QueryTxStateEnum.COMMITTED, QueryTxStateEnum.ROLLBACKED, QueryTxStateEnum.DEAD],
+ QueryTxStateEnum.COMMITTED: [],
+ QueryTxStateEnum.ROLLBACKED: [],
+ QueryTxStateEnum.DEAD: [],
+ }
+
+ @classmethod
+ def valid_transition(cls, before: QueryTxStateEnum, after: QueryTxStateEnum) -> bool:
+ return after in cls._VALID_TRANSITIONS[before]
+
+ @classmethod
+ def terminal(cls, state: QueryTxStateEnum) -> bool:
+ return len(cls._VALID_TRANSITIONS[state]) == 0
+
+
+def reset_tx_id_handler(func):
+ @functools.wraps(func)
+ def decorator(
+ rpc_state, response_pb, session_state: base.IQuerySessionState, tx_state: QueryTxState, *args, **kwargs
+ ):
+ try:
+ return func(rpc_state, response_pb, session_state, tx_state, *args, **kwargs)
+ except issues.Error:
+ tx_state._change_state(QueryTxStateEnum.DEAD)
+ tx_state.tx_id = None
+ raise
+
+ return decorator
+
+
+class QueryTxState:
+ def __init__(self, tx_mode: base.BaseQueryTxMode):
+ """
+ Holds transaction context manager info
+ :param tx_mode: A mode of transaction
+ """
+ self.tx_id = None
+ self.tx_mode = tx_mode
+ self._state = QueryTxStateEnum.NOT_INITIALIZED
+
+ def _check_invalid_transition(self, target: QueryTxStateEnum) -> None:
+ if not QueryTxStateHelper.valid_transition(self._state, target):
+ raise RuntimeError(f"Transaction could not be moved from {self._state.value} to {target.value}")
+
+ def _change_state(self, target: QueryTxStateEnum) -> None:
+ self._check_invalid_transition(target)
+ self._state = target
+
+ def _check_tx_ready_to_use(self) -> None:
+ if QueryTxStateHelper.terminal(self._state):
+ raise RuntimeError(f"Transaction is in terminal state: {self._state.value}")
+
+ def _already_in(self, target: QueryTxStateEnum) -> bool:
+ return self._state == target
+
+
+def _construct_tx_settings(tx_state: QueryTxState) -> _ydb_query.TransactionSettings:
+ tx_settings = _ydb_query.TransactionSettings.from_public(tx_state.tx_mode)
+ return tx_settings
+
+
+def _create_begin_transaction_request(
+ session_state: base.IQuerySessionState, tx_state: QueryTxState
+) -> _apis.ydb_query.BeginTransactionRequest:
+ request = _ydb_query.BeginTransactionRequest(
+ session_id=session_state.session_id,
+ tx_settings=_construct_tx_settings(tx_state),
+ ).to_proto()
+ return request
+
+
+def _create_commit_transaction_request(
+ session_state: base.IQuerySessionState, tx_state: QueryTxState
+) -> _apis.ydb_query.CommitTransactionRequest:
+ request = _apis.ydb_query.CommitTransactionRequest()
+ request.tx_id = tx_state.tx_id
+ request.session_id = session_state.session_id
+ return request
+
+
+def _create_rollback_transaction_request(
+ session_state: base.IQuerySessionState, tx_state: QueryTxState
+) -> _apis.ydb_query.RollbackTransactionRequest:
+ request = _apis.ydb_query.RollbackTransactionRequest()
+ request.tx_id = tx_state.tx_id
+ request.session_id = session_state.session_id
+ return request
+
+
+@base.bad_session_handler
+def wrap_tx_begin_response(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.BeginTransactionResponse,
+ session_state: base.IQuerySessionState,
+ tx_state: QueryTxState,
+ tx: "BaseQueryTxContext",
+) -> "BaseQueryTxContext":
+ message = _ydb_query.BeginTransactionResponse.from_proto(response_pb)
+ issues._process_response(message.status)
+ tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx_state.tx_id = message.tx_meta.tx_id
+ return tx
+
+
+@base.bad_session_handler
+@reset_tx_id_handler
+def wrap_tx_commit_response(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.CommitTransactionResponse,
+ session_state: base.IQuerySessionState,
+ tx_state: QueryTxState,
+ tx: "BaseQueryTxContext",
+) -> "BaseQueryTxContext":
+ message = _ydb_query.CommitTransactionResponse.from_proto(response_pb)
+ issues._process_response(message.status)
+ tx_state._change_state(QueryTxStateEnum.COMMITTED)
+ return tx
+
+
+@base.bad_session_handler
+@reset_tx_id_handler
+def wrap_tx_rollback_response(
+ rpc_state: RpcState,
+ response_pb: _apis.ydb_query.RollbackTransactionResponse,
+ session_state: base.IQuerySessionState,
+ tx_state: QueryTxState,
+ tx: "BaseQueryTxContext",
+) -> "BaseQueryTxContext":
+ message = _ydb_query.RollbackTransactionResponse.from_proto(response_pb)
+ issues._process_response(message.status)
+ tx_state._change_state(QueryTxStateEnum.ROLLBACKED)
+ return tx
+
+
+class BaseQueryTxContext(base.IQueryTxContext):
+ def __init__(self, driver, session_state, session, tx_mode):
+ """
+ An object that provides a simple transaction context manager that allows statements execution
+ in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
+ transaction control logic, and opens new transaction if:
+
+ 1) By explicit .begin() method;
+ 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip
+
+ This context manager is not thread-safe, so you should not manipulate on it concurrently.
+
+ :param driver: A driver instance
+ :param session_state: A state of session
+ :param tx_mode: Transaction mode, which is a one from the following choises:
+ 1) QuerySerializableReadWrite() which is default mode;
+ 2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
+ 3) QuerySnapshotReadOnly();
+ 4) QueryStaleReadOnly().
+ """
+
+ self._driver = driver
+ self._tx_state = QueryTxState(tx_mode)
+ self._session_state = session_state
+ self.session = session
+ self._prev_stream = None
+
+ def __enter__(self) -> "BaseQueryTxContext":
+ """
+ Enters a context manager and returns a transaction
+
+ :return: A transaction instance
+ """
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ """
+ Closes a transaction context manager and rollbacks transaction if
+ it is not finished explicitly
+ """
+ self._ensure_prev_stream_finished()
+ if self._tx_state._state == QueryTxStateEnum.BEGINED:
+ # It's strictly recommended to close transactions directly
+ # by using commit_tx=True flag while executing statement or by
+ # .commit() or .rollback() methods, but here we trying to do best
+ # effort to avoid useless open transactions
+ logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
+ try:
+ self.rollback()
+ except issues.Error:
+ logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
+
+ @property
+ def session_id(self) -> str:
+ """
+ A transaction's session id
+
+ :return: A transaction's session id
+ """
+ return self._session_state.session_id
+
+ @property
+ def tx_id(self) -> Optional[str]:
+ """
+ Returns an id of open transaction or None otherwise
+
+ :return: An id of open transaction or None otherwise
+ """
+ return self._tx_state.tx_id
+
+ def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ return self._driver(
+ _create_begin_transaction_request(self._session_state, self._tx_state),
+ _apis.QueryService.Stub,
+ _apis.QueryService.BeginTransaction,
+ wrap_tx_begin_response,
+ settings,
+ (self._session_state, self._tx_state, self),
+ )
+
+ def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ return self._driver(
+ _create_commit_transaction_request(self._session_state, self._tx_state),
+ _apis.QueryService.Stub,
+ _apis.QueryService.CommitTransaction,
+ wrap_tx_commit_response,
+ settings,
+ (self._session_state, self._tx_state, self),
+ )
+
+ def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext":
+ return self._driver(
+ _create_rollback_transaction_request(self._session_state, self._tx_state),
+ _apis.QueryService.Stub,
+ _apis.QueryService.RollbackTransaction,
+ wrap_tx_rollback_response,
+ settings,
+ (self._session_state, self._tx_state, self),
+ )
+
+ def _execute_call(
+ self,
+ query: str,
+ commit_tx: bool = False,
+ syntax: base.QuerySyntax = None,
+ exec_mode: base.QueryExecMode = None,
+ parameters: dict = None,
+ concurrent_result_sets: bool = False,
+ ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
+ request = base.create_execute_query_request(
+ query=query,
+ session_id=self._session_state.session_id,
+ commit_tx=commit_tx,
+ tx_id=self._tx_state.tx_id,
+ tx_mode=self._tx_state.tx_mode,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+
+ return self._driver(
+ request.to_proto(),
+ _apis.QueryService.Stub,
+ _apis.QueryService.ExecuteQuery,
+ )
+
+ def _ensure_prev_stream_finished(self) -> None:
+ if self._prev_stream is not None:
+ for _ in self._prev_stream:
+ pass
+ self._prev_stream = None
+
+ def _move_to_beginned(self, tx_id: str) -> None:
+ if self._tx_state._already_in(QueryTxStateEnum.BEGINED):
+ return
+ self._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ self._tx_state.tx_id = tx_id
+
+ def _move_to_commited(self) -> None:
+ if self._tx_state._already_in(QueryTxStateEnum.COMMITTED):
+ return
+ self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
+
+ def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Explicitly begins a transaction
+
+ :param settings: A request settings
+
+ :return: None or exception if begin is failed
+ """
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED)
+
+ self._begin_call(settings)
+
+ def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
+ failed then this method raises PreconditionFailed.
+
+ :param settings: A request settings
+
+ :return: A committed transaction or exception if commit is failed
+ """
+ if self._tx_state._already_in(QueryTxStateEnum.COMMITTED):
+ return
+ self._ensure_prev_stream_finished()
+
+ if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
+ self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
+ return
+
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED)
+
+ self._commit_call(settings)
+
+ def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None:
+ if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED):
+ return
+
+ self._ensure_prev_stream_finished()
+
+ if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED:
+ self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED)
+ return
+
+ self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED)
+
+ self._rollback_call(settings)
+
+ def execute(
+ self,
+ query: str,
+ commit_tx: Optional[bool] = False,
+ syntax: Optional[base.QuerySyntax] = None,
+ exec_mode: Optional[base.QueryExecMode] = None,
+ parameters: Optional[dict] = None,
+ concurrent_result_sets: Optional[bool] = False,
+ ) -> base.SyncResponseContextIterator:
+ """WARNING: This API is experimental and could be changed.
+
+ Sends a query to Query Service
+ :param query: (YQL or SQL text) to be executed.
+ :param commit_tx: A special flag that allows transaction commit.
+ :param syntax: Syntax of the query, which is a one from the following choises:
+ 1) QuerySyntax.YQL_V1, which is default;
+ 2) QuerySyntax.PG.
+ :param exec_mode: Exec mode of the query, which is a one from the following choises:
+ 1) QueryExecMode.EXECUTE, which is default;
+ 2) QueryExecMode.EXPLAIN;
+ 3) QueryExecMode.VALIDATE;
+ 4) QueryExecMode.PARSE.
+ :param parameters: dict with parameters and YDB types;
+ :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
+
+ :return: Iterator with result sets
+ """
+ self._ensure_prev_stream_finished()
+ self._tx_state._check_tx_ready_to_use()
+
+ stream_it = self._execute_call(
+ query=query,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ )
+ self._prev_stream = base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ tx=self,
+ commit_tx=commit_tx,
+ ),
+ )
+ return self._prev_stream
diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py
new file mode 100644
index 0000000000..5d4f6e6a0f
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/retries.py
@@ -0,0 +1,136 @@
+import random
+import time
+
+from . import issues
+from ._errors import check_retriable_error
+
+
+class BackoffSettings(object):
+ def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5):
+ self.ceiling = ceiling
+ self.slot_duration = slot_duration
+ self.uncertain_ratio = uncertain_ratio
+
+ def calc_timeout(self, retry_number):
+ slots_count = 1 << min(retry_number, self.ceiling)
+ max_duration_ms = slots_count * self.slot_duration * 1000.0
+ # duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio)
+ duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio)
+ return duration_ms / 1000.0
+
+
+class RetrySettings(object):
+ def __init__(
+ self,
+ max_retries=10,
+ max_session_acquire_timeout=None,
+ on_ydb_error_callback=None,
+ backoff_ceiling=6,
+ backoff_slot_duration=1,
+ get_session_client_timeout=5,
+ fast_backoff_settings=None,
+ slow_backoff_settings=None,
+ idempotent=False,
+ ):
+ self.max_retries = max_retries
+ self.max_session_acquire_timeout = max_session_acquire_timeout
+ self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
+ self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings
+ self.slow_backoff = (
+ BackoffSettings(backoff_ceiling, backoff_slot_duration)
+ if slow_backoff_settings is None
+ else slow_backoff_settings
+ )
+ self.retry_not_found = True
+ self.idempotent = idempotent
+ self.retry_internal_error = True
+ self.unknown_error_handler = lambda e: None
+ self.get_session_client_timeout = get_session_client_timeout
+ if max_session_acquire_timeout is not None:
+ self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout)
+
+ def with_fast_backoff(self, backoff_settings):
+ self.fast_backoff = backoff_settings
+ return self
+
+ def with_slow_backoff(self, backoff_settings):
+ self.slow_backoff = backoff_settings
+ return self
+
+
+class YdbRetryOperationSleepOpt(object):
+ def __init__(self, timeout):
+ self.timeout = timeout
+
+ def __eq__(self, other):
+ return type(self) == type(other) and self.timeout == other.timeout
+
+ def __repr__(self):
+ return "YdbRetryOperationSleepOpt(%s)" % self.timeout
+
+
+class YdbRetryOperationFinalResult(object):
+ def __init__(self, result):
+ self.result = result
+ self.exc = None
+
+ def __eq__(self, other):
+ return type(self) == type(other) and self.result == other.result and self.exc == other.exc
+
+ def __repr__(self):
+ return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
+
+ def set_exception(self, exc):
+ self.exc = exc
+
+
+def retry_operation_impl(callee, retry_settings=None, *args, **kwargs):
+ retry_settings = RetrySettings() if retry_settings is None else retry_settings
+ status = None
+
+ for attempt in range(retry_settings.max_retries + 1):
+ try:
+ result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
+ yield result
+
+ if result.exc is not None:
+ raise result.exc
+
+ except issues.Error as e:
+ status = e
+ retry_settings.on_ydb_error_callback(e)
+
+ retriable_info = check_retriable_error(e, retry_settings, attempt)
+ if not retriable_info.is_retriable:
+ raise
+
+ skip_yield_error_types = [
+ issues.Aborted,
+ issues.BadSession,
+ issues.NotFound,
+ issues.InternalError,
+ ]
+
+ yield_sleep = True
+ for t in skip_yield_error_types:
+ if isinstance(e, t):
+ yield_sleep = False
+
+ if yield_sleep:
+ yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
+
+ except Exception as e:
+ # you should provide your own handler you want
+ retry_settings.unknown_error_handler(e)
+ raise
+
+ raise status
+
+
+def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
+ opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
+ for next_opt in opt_generator:
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ time.sleep(next_opt.timeout)
+ else:
+ return next_opt.result
diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py
index c21392bb4c..ac9f93042c 100644
--- a/contrib/python/ydb/py3/ydb/table.py
+++ b/contrib/python/ydb/py3/ydb/table.py
@@ -3,8 +3,6 @@ import abc
import ydb
from abc import abstractmethod
import logging
-import time
-import random
import enum
from . import (
@@ -20,7 +18,15 @@ from . import (
_tx_ctx_impl,
tracing,
)
-from ._errors import check_retriable_error
+
+from .retries import (
+ YdbRetryOperationFinalResult, # noqa
+ YdbRetryOperationSleepOpt, # noqa
+ BackoffSettings, # noqa
+ retry_operation_impl, # noqa
+ RetrySettings,
+ retry_operation_sync,
+)
try:
from . import interceptor
@@ -840,137 +846,6 @@ class StaleReadOnly(AbstractTransactionModeBuilder):
return self._name
-class BackoffSettings(object):
- def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5):
- self.ceiling = ceiling
- self.slot_duration = slot_duration
- self.uncertain_ratio = uncertain_ratio
-
- def calc_timeout(self, retry_number):
- slots_count = 1 << min(retry_number, self.ceiling)
- max_duration_ms = slots_count * self.slot_duration * 1000.0
- # duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio)
- duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio)
- return duration_ms / 1000.0
-
-
-class RetrySettings(object):
- def __init__(
- self,
- max_retries=10,
- max_session_acquire_timeout=None,
- on_ydb_error_callback=None,
- backoff_ceiling=6,
- backoff_slot_duration=1,
- get_session_client_timeout=5,
- fast_backoff_settings=None,
- slow_backoff_settings=None,
- idempotent=False,
- ):
- self.max_retries = max_retries
- self.max_session_acquire_timeout = max_session_acquire_timeout
- self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
- self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings
- self.slow_backoff = (
- BackoffSettings(backoff_ceiling, backoff_slot_duration)
- if slow_backoff_settings is None
- else slow_backoff_settings
- )
- self.retry_not_found = True
- self.idempotent = idempotent
- self.retry_internal_error = True
- self.unknown_error_handler = lambda e: None
- self.get_session_client_timeout = get_session_client_timeout
- if max_session_acquire_timeout is not None:
- self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout)
-
- def with_fast_backoff(self, backoff_settings):
- self.fast_backoff = backoff_settings
- return self
-
- def with_slow_backoff(self, backoff_settings):
- self.slow_backoff = backoff_settings
- return self
-
-
-class YdbRetryOperationSleepOpt(object):
- def __init__(self, timeout):
- self.timeout = timeout
-
- def __eq__(self, other):
- return type(self) == type(other) and self.timeout == other.timeout
-
- def __repr__(self):
- return "YdbRetryOperationSleepOpt(%s)" % self.timeout
-
-
-class YdbRetryOperationFinalResult(object):
- def __init__(self, result):
- self.result = result
- self.exc = None
-
- def __eq__(self, other):
- return type(self) == type(other) and self.result == other.result and self.exc == other.exc
-
- def __repr__(self):
- return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
-
- def set_exception(self, exc):
- self.exc = exc
-
-
-def retry_operation_impl(callee, retry_settings=None, *args, **kwargs):
- retry_settings = RetrySettings() if retry_settings is None else retry_settings
- status = None
-
- for attempt in range(retry_settings.max_retries + 1):
- try:
- result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
- yield result
-
- if result.exc is not None:
- raise result.exc
-
- except issues.Error as e:
- status = e
- retry_settings.on_ydb_error_callback(e)
-
- retriable_info = check_retriable_error(e, retry_settings, attempt)
- if not retriable_info.is_retriable:
- raise
-
- skip_yield_error_types = [
- issues.Aborted,
- issues.BadSession,
- issues.NotFound,
- issues.InternalError,
- ]
-
- yield_sleep = True
- for t in skip_yield_error_types:
- if isinstance(e, t):
- yield_sleep = False
-
- if yield_sleep:
- yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
-
- except Exception as e:
- # you should provide your own handler you want
- retry_settings.unknown_error_handler(e)
- raise
-
- raise status
-
-
-def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- time.sleep(next_opt.timeout)
- else:
- return next_opt.result
-
-
class TableClientSettings(object):
def __init__(self):
self._client_query_cache_enabled = False
diff --git a/contrib/python/ydb/py3/ydb/types.py b/contrib/python/ydb/py3/ydb/types.py
index 49792ed39e..a48548640c 100644
--- a/contrib/python/ydb/py3/ydb/types.py
+++ b/contrib/python/ydb/py3/ydb/types.py
@@ -2,6 +2,7 @@
from __future__ import annotations
import abc
+from dataclasses import dataclass
import enum
import json
from . import _utilities, _apis
@@ -440,3 +441,9 @@ class BulkUpsertColumns(AbstractTypeBuilder):
def __str__(self):
return "BulkUpsertColumns<%s>" % ",".join(self.__columns_repr)
+
+
+@dataclass
+class TypedValue:
+ value: typing.Any
+ value_type: typing.Optional[typing.Union[PrimitiveType, AbstractTypeBuilder]] = None
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 2edc0629d7..567cda12c7 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.13.0"
+VERSION = "3.15.0"