diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-09 17:57:55 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-08-09 18:56:25 +0300 |
commit | 88da31b9c96e50f21978fb1a45b3fd273f1b6dce (patch) | |
tree | b9181add08a7b1ab8f256a7b1bb7d0a9c33c3595 /contrib/python | |
parent | c3665c2967de61ecb4751064a3aa0284ba5f11de (diff) | |
download | ydb-88da31b9c96e50f21978fb1a45b3fd273f1b6dce.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python')
22 files changed, 1994 insertions, 166 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 64e6538edb..5b155408fc 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.13.0 +Version: 3.15.0 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index d0fb350d8c..cb2a88454a 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.13.0) +VERSION(3.15.0) LICENSE(Apache-2.0) @@ -29,6 +29,8 @@ PY_SRCS( ydb/_grpc/common/__init__.py ydb/_grpc/grpcwrapper/__init__.py ydb/_grpc/grpcwrapper/common_utils.py + ydb/_grpc/grpcwrapper/ydb_query.py + ydb/_grpc/grpcwrapper/ydb_query_public_types.py ydb/_grpc/grpcwrapper/ydb_scheme.py ydb/_grpc/grpcwrapper/ydb_topic.py ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -68,6 +70,7 @@ PY_SRCS( ydb/dbapi/errors.py ydb/default_pem.py ydb/draft/__init__.py + ydb/draft/_apis.py ydb/draft/dynamic_config.py ydb/driver.py ydb/export.py @@ -82,7 +85,13 @@ PY_SRCS( ydb/oauth2_token_exchange/token_source.py ydb/operation.py ydb/pool.py + ydb/query/__init__.py + ydb/query/base.py + ydb/query/pool.py + ydb/query/session.py + ydb/query/transaction.py ydb/resolver.py + ydb/retries.py ydb/scheme.py ydb/scripting.py ydb/settings.py diff --git a/contrib/python/ydb/py3/ydb/__init__.py b/contrib/python/ydb/py3/ydb/__init__.py index 8bfba3a902..db4f6e607f 100644 --- a/contrib/python/ydb/py3/ydb/__init__.py +++ b/contrib/python/ydb/py3/ydb/__init__.py @@ -2,6 +2,10 @@ from pkgutil import extend_path __path__ = extend_path(__path__, __name__) +from .ydb_version import VERSION + +__version__ = VERSION + from .credentials import * # noqa from .driver import * # noqa from .global_settings import * # noqa @@ -19,6 +23,8 @@ from .import_client import * # noqa from .tracing import * # noqa from .topic import * # noqa from .draft import * # noqa +from .query import * # noqa +from .retries import * # noqa try: import ydb.aio as aio # noqa diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py index e340150996..fc28d0ceb2 100644 --- a/contrib/python/ydb/py3/ydb/_apis.py +++ b/contrib/python/ydb/py3/ydb/_apis.py @@ -9,10 +9,7 @@ try: ydb_table_v1_pb2_grpc, ydb_operation_v1_pb2_grpc, ydb_topic_v1_pb2_grpc, - ) - - from ydb.public.api.grpc.draft import ( - ydb_dynamic_config_v1_pb2_grpc, + ydb_query_v1_pb2_grpc, ) from ydb.public.api.protos import ( @@ -23,11 +20,9 @@ try: ydb_value_pb2, ydb_operation_pb2, ydb_common_pb2, + ydb_query_pb2, ) - from ydb.public.api.protos.draft import ( - ydb_dynamic_config_pb2, - ) except ImportError: from contrib.ydb.public.api.grpc import ( ydb_cms_v1_pb2_grpc, @@ -36,10 +31,7 @@ except ImportError: ydb_table_v1_pb2_grpc, ydb_operation_v1_pb2_grpc, ydb_topic_v1_pb2_grpc, - ) - - from contrib.ydb.public.api.grpc.draft import ( - ydb_dynamic_config_v1_pb2_grpc, + ydb_query_v1_pb2_grpc, ) from contrib.ydb.public.api.protos import ( @@ -50,10 +42,7 @@ except ImportError: ydb_value_pb2, ydb_operation_pb2, ydb_common_pb2, - ) - - from contrib.ydb.public.api.protos.draft import ( - ydb_dynamic_config_pb2, + ydb_query_pb2, ) @@ -65,7 +54,7 @@ ydb_scheme = ydb_scheme_pb2 ydb_table = ydb_table_pb2 ydb_discovery = ydb_discovery_pb2 ydb_operation = ydb_operation_pb2 -ydb_dynamic_config = ydb_dynamic_config_pb2 +ydb_query = ydb_query_pb2 class CmsService(object): @@ -128,10 +117,17 @@ class TopicService(object): StreamWrite = "StreamWrite" -class DynamicConfigService(object): - Stub = ydb_dynamic_config_v1_pb2_grpc.DynamicConfigServiceStub +class QueryService(object): + Stub = ydb_query_v1_pb2_grpc.QueryServiceStub + + CreateSession = "CreateSession" + DeleteSession = "DeleteSession" + AttachSession = "AttachSession" + + BeginTransaction = "BeginTransaction" + CommitTransaction = "CommitTransaction" + RollbackTransaction = "RollbackTransaction" - ReplaceConfig = "ReplaceConfig" - SetConfig = "SetConfig" - GetConfig = "GetConfig" - GetNodeLabels = "GetNodeLabels" + ExecuteQuery = "ExecuteQuery" + ExecuteScript = "ExecuteScript" + FetchScriptResults = "FetchScriptResults" diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py new file mode 100644 index 0000000000..8fc09b0072 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query.py @@ -0,0 +1,181 @@ +from dataclasses import dataclass +import typing +from typing import Optional + + +try: + from ydb.public.api.protos import ydb_query_pb2 +except ImportError: + from contrib.ydb.public.api.protos import ydb_query_pb2 + +from . import ydb_query_public_types as public_types + +from .common_utils import ( + IFromProto, + IToProto, + IFromPublic, + ServerStatus, +) + +from ... import convert + + +@dataclass +class CreateSessionResponse(IFromProto): + status: ServerStatus + session_id: str + node_id: int + + @staticmethod + def from_proto(msg: ydb_query_pb2.CreateSessionResponse) -> "CreateSessionResponse": + return CreateSessionResponse( + status=ServerStatus(msg.status, msg.issues), + session_id=msg.session_id, + node_id=msg.node_id, + ) + + +@dataclass +class DeleteSessionResponse(IFromProto): + status: ServerStatus + + @staticmethod + def from_proto(msg: ydb_query_pb2.DeleteSessionResponse) -> "DeleteSessionResponse": + return DeleteSessionResponse(status=ServerStatus(msg.status, msg.issues)) + + +@dataclass +class AttachSessionRequest(IToProto): + session_id: str + + def to_proto(self) -> ydb_query_pb2.AttachSessionRequest: + return ydb_query_pb2.AttachSessionRequest(session_id=self.session_id) + + +@dataclass +class TransactionMeta(IFromProto): + tx_id: str + + @staticmethod + def from_proto(msg: ydb_query_pb2.TransactionMeta) -> "TransactionMeta": + return TransactionMeta(tx_id=msg.id) + + +@dataclass +class TransactionSettings(IFromPublic, IToProto): + tx_mode: public_types.BaseQueryTxMode + + @staticmethod + def from_public(tx_mode: public_types.BaseQueryTxMode) -> "TransactionSettings": + return TransactionSettings(tx_mode=tx_mode) + + def to_proto(self) -> ydb_query_pb2.TransactionSettings: + if self.tx_mode.name == "snapshot_read_only": + return ydb_query_pb2.TransactionSettings(snapshot_read_only=self.tx_mode.to_proto()) + if self.tx_mode.name == "serializable_read_write": + return ydb_query_pb2.TransactionSettings(serializable_read_write=self.tx_mode.to_proto()) + if self.tx_mode.name == "online_read_only": + return ydb_query_pb2.TransactionSettings(online_read_only=self.tx_mode.to_proto()) + if self.tx_mode.name == "stale_read_only": + return ydb_query_pb2.TransactionSettings(stale_read_only=self.tx_mode.to_proto()) + + +@dataclass +class BeginTransactionRequest(IToProto): + session_id: str + tx_settings: TransactionSettings + + def to_proto(self) -> ydb_query_pb2.BeginTransactionRequest: + return ydb_query_pb2.BeginTransactionRequest( + session_id=self.session_id, + tx_settings=self.tx_settings.to_proto(), + ) + + +@dataclass +class BeginTransactionResponse(IFromProto): + status: Optional[ServerStatus] + tx_meta: TransactionMeta + + @staticmethod + def from_proto(msg: ydb_query_pb2.BeginTransactionResponse) -> "BeginTransactionResponse": + return BeginTransactionResponse( + status=ServerStatus(msg.status, msg.issues), + tx_meta=TransactionMeta.from_proto(msg.tx_meta), + ) + + +@dataclass +class CommitTransactionResponse(IFromProto): + status: Optional[ServerStatus] + + @staticmethod + def from_proto(msg: ydb_query_pb2.CommitTransactionResponse) -> "CommitTransactionResponse": + return CommitTransactionResponse( + status=ServerStatus(msg.status, msg.issues), + ) + + +@dataclass +class RollbackTransactionResponse(IFromProto): + status: Optional[ServerStatus] + + @staticmethod + def from_proto(msg: ydb_query_pb2.RollbackTransactionResponse) -> "RollbackTransactionResponse": + return RollbackTransactionResponse( + status=ServerStatus(msg.status, msg.issues), + ) + + +@dataclass +class QueryContent(IFromPublic, IToProto): + text: str + syntax: int + + @staticmethod + def from_public(query: str, syntax: int) -> "QueryContent": + return QueryContent(text=query, syntax=syntax) + + def to_proto(self) -> ydb_query_pb2.QueryContent: + return ydb_query_pb2.QueryContent(text=self.text, syntax=self.syntax) + + +@dataclass +class TransactionControl(IToProto): + begin_tx: Optional[TransactionSettings] + commit_tx: Optional[bool] + tx_id: Optional[str] + + def to_proto(self) -> ydb_query_pb2.TransactionControl: + if self.tx_id: + return ydb_query_pb2.TransactionControl( + tx_id=self.tx_id, + commit_tx=self.commit_tx, + ) + return ydb_query_pb2.TransactionControl( + begin_tx=self.begin_tx.to_proto(), + commit_tx=self.commit_tx, + ) + + +@dataclass +class ExecuteQueryRequest(IToProto): + session_id: str + query_content: QueryContent + tx_control: TransactionControl + concurrent_result_sets: bool + exec_mode: int + parameters: dict + stats_mode: int + + def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest: + tx_control = self.tx_control.to_proto() if self.tx_control is not None else self.tx_control + return ydb_query_pb2.ExecuteQueryRequest( + session_id=self.session_id, + tx_control=tx_control, + query_content=self.query_content.to_proto(), + exec_mode=self.exec_mode, + stats_mode=self.stats_mode, + concurrent_result_sets=self.concurrent_result_sets, + parameters=convert.query_parameters_to_pb(self.parameters), + ) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py new file mode 100644 index 0000000000..3ef2d55430 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py @@ -0,0 +1,65 @@ +import abc +import typing + +from .common_utils import IToProto + +try: + from ydb.public.api.protos import ydb_query_pb2 +except ImportError: + from contrib.ydb.public.api.protos import ydb_query_pb2 + + +class BaseQueryTxMode(IToProto): + @property + @abc.abstractmethod + def name(self) -> str: + pass + + +class QuerySnapshotReadOnly(BaseQueryTxMode): + def __init__(self): + self._name = "snapshot_read_only" + + @property + def name(self) -> str: + return self._name + + def to_proto(self) -> ydb_query_pb2.SnapshotModeSettings: + return ydb_query_pb2.SnapshotModeSettings() + + +class QuerySerializableReadWrite(BaseQueryTxMode): + def __init__(self): + self._name = "serializable_read_write" + + @property + def name(self) -> str: + return self._name + + def to_proto(self) -> ydb_query_pb2.SerializableModeSettings: + return ydb_query_pb2.SerializableModeSettings() + + +class QueryOnlineReadOnly(BaseQueryTxMode): + def __init__(self, allow_inconsistent_reads: bool = False): + self.allow_inconsistent_reads = allow_inconsistent_reads + self._name = "online_read_only" + + @property + def name(self): + return self._name + + def to_proto(self) -> ydb_query_pb2.OnlineModeSettings: + return ydb_query_pb2.OnlineModeSettings(allow_inconsistent_reads=self.allow_inconsistent_reads) + + +class QueryStaleReadOnly(BaseQueryTxMode): + def __init__(self): + self._name = "stale_read_only" + + @property + def name(self): + return self._name + + def to_proto(self) -> ydb_query_pb2.StaleModeSettings: + return ydb_query_pb2.StaleModeSettings() diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index 17fb288555..b907ee2794 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -10,7 +10,7 @@ from typing import ( Callable, ) -from ..table import RetrySettings +from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py index 007c8a54b5..585e88abd7 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py @@ -26,9 +26,9 @@ from .topic_writer import ( from .. import ( _apis, issues, - check_retriable_error, - RetrySettings, ) +from .._errors import check_retriable_error +from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec from .._grpc.grpcwrapper.ydb_topic import ( UpdateTokenRequest, diff --git a/contrib/python/ydb/py3/ydb/convert.py b/contrib/python/ydb/py3/ydb/convert.py index 6c4164bc44..63a5dbe4e3 100644 --- a/contrib/python/ydb/py3/ydb/convert.py +++ b/contrib/python/ydb/py3/ydb/convert.py @@ -281,6 +281,66 @@ def parameters_to_pb(parameters_types, parameters_values): return param_values_pb +def query_parameters_to_pb(parameters): + if parameters is None or not parameters: + return {} + + parameters_types = {} + parameters_values = {} + for name, value in parameters.items(): + if isinstance(value, types.TypedValue): + if value.value_type is None: + value.value_type = _type_from_python_native(value.value) + elif isinstance(value, tuple): + value = types.TypedValue(*value) + else: + value = types.TypedValue(value, _type_from_python_native(value)) + + parameters_values[name] = value.value + parameters_types[name] = value.value_type + + return parameters_to_pb(parameters_types, parameters_values) + + +_from_python_type_map = { + int: types.PrimitiveType.Int64, + float: types.PrimitiveType.Float, + bool: types.PrimitiveType.Bool, + str: types.PrimitiveType.Utf8, +} + + +def _type_from_python_native(value): + t = type(value) + + if t in _from_python_type_map: + return _from_python_type_map[t] + + if t == list: + if len(value) == 0: + raise ValueError( + "Could not map empty list to any type, please specify " + "it manually by tuple(value, type) or ydb.TypedValue" + ) + entry_type = _type_from_python_native(value[0]) + return types.ListType(entry_type) + + if t == dict: + if len(value) == 0: + raise ValueError( + "Could not map empty dict to any type, please specify " + "it manually by tuple(value, type) or ydb.TypedValue" + ) + entry = list(value.items())[0] + key_type = _type_from_python_native(entry[0]) + value_type = _type_from_python_native(entry[1]) + return types.DictType(key_type, value_type) + + raise ValueError( + "Could not map value to any type, please specify it manually by tuple(value, type) or ydb.TypedValue" + ) + + def _unwrap_optionality(column): c_type = column.type current_type = c_type.WhichOneof("type") diff --git a/contrib/python/ydb/py3/ydb/draft/_apis.py b/contrib/python/ydb/py3/ydb/draft/_apis.py new file mode 100644 index 0000000000..8c9b56a065 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/draft/_apis.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import typing + +try: + from ydb.public.api.grpc.draft import ( + ydb_dynamic_config_v1_pb2_grpc, + ) + + from ydb.public.api.protos.draft import ( + ydb_dynamic_config_pb2, + ) +except ImportError: + from contrib.ydb.public.api.grpc.draft import ( + ydb_dynamic_config_v1_pb2_grpc, + ) + + from contrib.ydb.public.api.protos.draft import ( + ydb_dynamic_config_pb2, + ) + + +ydb_dynamic_config = ydb_dynamic_config_pb2 + + +class DynamicConfigService(object): + Stub = ydb_dynamic_config_v1_pb2_grpc.DynamicConfigServiceStub + + ReplaceConfig = "ReplaceConfig" + SetConfig = "SetConfig" + GetConfig = "GetConfig" + GetNodeLabels = "GetNodeLabels" diff --git a/contrib/python/ydb/py3/ydb/draft/dynamic_config.py b/contrib/python/ydb/py3/ydb/draft/dynamic_config.py index afec19eca7..4648b29a77 100644 --- a/contrib/python/ydb/py3/ydb/draft/dynamic_config.py +++ b/contrib/python/ydb/py3/ydb/draft/dynamic_config.py @@ -1,6 +1,7 @@ import abc from abc import abstractmethod -from .. import issues, operation, _apis +from . import _apis +from .. import issues, operation class IDynamicConfigClient(abc.ABC): diff --git a/contrib/python/ydb/py3/ydb/driver.py b/contrib/python/ydb/py3/ydb/driver.py index 89109b9b57..ecd3319e87 100644 --- a/contrib/python/ydb/py3/ydb/driver.py +++ b/contrib/python/ydb/py3/ydb/driver.py @@ -54,6 +54,13 @@ def credentials_from_env_variables(tracer=None): ctx.trace({"credentials.access_token": True}) return credentials_impl.AuthTokenCredentials(access_token) + oauth2_key_file = os.getenv("YDB_OAUTH2_KEY_FILE") + if oauth2_key_file: + ctx.trace({"credentials.oauth2_key_file": True}) + import ydb.oauth2_token_exchange + + return ydb.oauth2_token_exchange.Oauth2TokenExchangeCredentials.from_file(oauth2_key_file) + ctx.trace( { "credentials.env_default": True, diff --git a/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py b/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py index e4d1db9498..819f719b6a 100644 --- a/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py +++ b/contrib/python/ydb/py3/ydb/oauth2_token_exchange/token_exchange.py @@ -2,6 +2,8 @@ import typing import json import abc +import os +import base64 try: import requests @@ -9,7 +11,24 @@ except ImportError: requests = None from ydb import credentials, tracing, issues -from .token_source import TokenSource +from .token_source import TokenSource, FixedTokenSource, JwtTokenSource + + +# method -> is HMAC +_supported_uppercase_jwt_algs = { + "HS256": True, + "HS384": True, + "HS512": True, + "RS256": False, + "RS384": False, + "RS512": False, + "PS256": False, + "PS384": False, + "PS512": False, + "ES256": False, + "ES384": False, + "ES512": False, +} class Oauth2TokenExchangeCredentialsBase(abc.ABC): @@ -20,7 +39,7 @@ class Oauth2TokenExchangeCredentialsBase(abc.ABC): actor_token_source: typing.Optional[TokenSource] = None, audience: typing.Union[typing.List[str], str, None] = None, scope: typing.Union[typing.List[str], str, None] = None, - resource: typing.Optional[str] = None, + resource: typing.Union[typing.List[str], str, None] = None, grant_type: str = "urn:ietf:params:oauth:grant-type:token-exchange", requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token", ): @@ -94,6 +113,193 @@ class Oauth2TokenExchangeCredentialsBase(abc.ABC): return params + @classmethod + def _jwt_token_source_from_config(cls, cfg_json): + signing_method = cls._required_string_from_config(cfg_json, "alg") + is_hmac = _supported_uppercase_jwt_algs.get(signing_method.upper(), None) + if is_hmac is not None: # we know this method => do uppercase + signing_method = signing_method.upper() + private_key = cls._required_string_from_config(cfg_json, "private-key") + if is_hmac: # decode from base64 + private_key = base64.b64decode(private_key + "===") # to allow unpadded strings + return JwtTokenSource( + signing_method=signing_method, + private_key=private_key, + key_id=cls._string_with_default_from_config(cfg_json, "kid", None), + issuer=cls._string_with_default_from_config(cfg_json, "iss", None), + subject=cls._string_with_default_from_config(cfg_json, "sub", None), + audience=cls._list_of_strings_or_single_from_config(cfg_json, "aud"), + id=cls._string_with_default_from_config(cfg_json, "jti", None), + token_ttl_seconds=cls._duration_seconds_from_config(cfg_json, "ttl", 3600), + ) + + @classmethod + def _fixed_token_source_from_config(cls, cfg_json): + return FixedTokenSource( + cls._required_string_from_config(cfg_json, "token"), + cls._required_string_from_config(cfg_json, "token-type"), + ) + + @classmethod + def _token_source_from_config(cls, cfg_json, key_name): + value = cfg_json.get(key_name, None) + if value is None: + return None + if not isinstance(value, dict): + raise Exception('Key "{}" is expected to be a json map'.format(key_name)) + + source_type = cls._required_string_from_config(value, "type") + if source_type.upper() == "FIXED": + return cls._fixed_token_source_from_config(value) + if source_type.upper() == "JWT": + return cls._jwt_token_source_from_config(value) + raise Exception('"{}": unknown token source type: "{}"'.format(key_name, source_type)) + + @classmethod + def _list_of_strings_or_single_from_config(cls, cfg_json, key_name): + value = cfg_json.get(key_name, None) + if value is None: + return None + if isinstance(value, list): + for val in value: + if not isinstance(val, str) or not val: + raise Exception( + 'Key "{}" is expected to be a single string or list of nonempty strings'.format(key_name) + ) + return value + else: + if isinstance(value, str): + return value + raise Exception('Key "{}" is expected to be a single string or list of nonempty strings'.format(key_name)) + + @classmethod + def _required_string_from_config(cls, cfg_json, key_name): + value = cfg_json.get(key_name, None) + if value is None or not isinstance(value, str) or not value: + raise Exception('Key "{}" is expected to be a nonempty string'.format(key_name)) + return value + + @classmethod + def _string_with_default_from_config(cls, cfg_json, key_name, default_value): + value = cfg_json.get(key_name, None) + if value is None: + return default_value + if not isinstance(value, str): + raise Exception('Key "{}" is expected to be a string'.format(key_name)) + return value + + @classmethod + def _duration_seconds_from_config(cls, cfg_json, key_name, default_value): + value = cfg_json.get(key_name, None) + if value is None: + return default_value + if not isinstance(value, str): + raise Exception('Key "{}" is expected to be a string'.format(key_name)) + multiplier = 1 + if value.endswith("s"): + multiplier = 1 + value = value[:-1] + elif value.endswith("m"): + multiplier = 60 + value = value[:-1] + elif value.endswith("h"): + multiplier = 3600 + value = value[:-1] + elif value.endswith("d"): + multiplier = 3600 * 24 + value = value[:-1] + elif value.endswith("ms"): + multiplier = 1.0 / 1000 + value = value[:-2] + elif value.endswith("us"): + multiplier = 1.0 / 1000000 + value = value[:-2] + elif value.endswith("ns"): + multiplier = 1.0 / 1000000000 + value = value[:-2] + f = float(value) + if f < 0.0: + raise Exception("{}: negative duration is not allowed".format(value)) + return int(f * multiplier) + + @classmethod + def from_file(cls, cfg_file, iam_endpoint=None): + """ + Create OAuth 2.0 token exchange protocol credentials from config file. + + https://www.rfc-editor.org/rfc/rfc8693 + Config file must be a valid json file + + Fields of json file + grant-type: [string] Grant type option (default: "urn:ietf:params:oauth:grant-type:token-exchange") + res: [string | list of strings] Resource option (optional) + aud: [string | list of strings] Audience option for token exchange request (optional) + scope: [string | list of strings] Scope option (optional) + requested-token-type: [string] Requested token type option (default: "urn:ietf:params:oauth:token-type:access_token") + subject-credentials: [creds_json] Subject credentials options (optional) + actor-credentials: [creds_json] Actor credentials options (optional) + token-endpoint: [string] Token endpoint + + Fields of creds_json (JWT): + type: [string] Token source type. Set JWT + alg: [string] Algorithm for JWT signature. + Supported algorithms can be listed + with GetSupportedOauth2TokenExchangeJwtAlgorithms() + private-key: [string] (Private) key in PEM format (RSA, EC) or Base64 format (HMAC) for JWT signature + kid: [string] Key id JWT standard claim (optional) + iss: [string] Issuer JWT standard claim (optional) + sub: [string] Subject JWT standard claim (optional) + aud: [string | list of strings] Audience JWT standard claim (optional) + jti: [string] JWT ID JWT standard claim (optional) + ttl: [string] Token TTL (default: 1h) + + Fields of creds_json (FIXED): + type: [string] Token source type. Set FIXED + token: [string] Token value + token-type: [string] Token type value. It will become + subject_token_type/actor_token_type parameter + in token exchange request (https://www.rfc-editor.org/rfc/rfc8693) + """ + with open(os.path.expanduser(cfg_file), "r") as r: + cfg = r.read() + + return cls.from_content(cfg, iam_endpoint=iam_endpoint) + + @classmethod + def from_content(cls, cfg, iam_endpoint=None): + try: + cfg_json = json.loads(cfg) + except Exception as ex: + raise Exception("Failed to parse json config: {}".format(ex)) + + if iam_endpoint is not None: + token_endpoint = iam_endpoint + else: + token_endpoint = cfg_json.get("token-endpoint", "") + + subject_token_source = cls._token_source_from_config(cfg_json, "subject-credentials") + actor_token_source = cls._token_source_from_config(cfg_json, "actor-credentials") + audience = cls._list_of_strings_or_single_from_config(cfg_json, "aud") + scope = cls._list_of_strings_or_single_from_config(cfg_json, "scope") + resource = cls._list_of_strings_or_single_from_config(cfg_json, "res") + grant_type = cls._string_with_default_from_config( + cfg_json, "grant-type", "urn:ietf:params:oauth:grant-type:token-exchange" + ) + requested_token_type = cls._string_with_default_from_config( + cfg_json, "requested-token-type", "urn:ietf:params:oauth:token-type:access_token" + ) + + return cls( + token_endpoint=token_endpoint, + subject_token_source=subject_token_source, + actor_token_source=actor_token_source, + audience=audience, + scope=scope, + resource=resource, + grant_type=grant_type, + requested_token_type=requested_token_type, + ) + class Oauth2TokenExchangeCredentials(credentials.AbstractExpiringTokenCredentials, Oauth2TokenExchangeCredentialsBase): def __init__( @@ -103,7 +309,7 @@ class Oauth2TokenExchangeCredentials(credentials.AbstractExpiringTokenCredential actor_token_source: typing.Optional[TokenSource] = None, audience: typing.Union[typing.List[str], str, None] = None, scope: typing.Union[typing.List[str], str, None] = None, - resource: typing.Optional[str] = None, + resource: typing.Union[typing.List[str], str, None] = None, grant_type: str = "urn:ietf:params:oauth:grant-type:token-exchange", requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token", tracer=None, diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py new file mode 100644 index 0000000000..eb967abc20 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/query/__init__.py @@ -0,0 +1,40 @@ +__all__ = [ + "QueryOnlineReadOnly", + "QuerySerializableReadWrite", + "QuerySnapshotReadOnly", + "QueryStaleReadOnly", + "QuerySessionPool", + "QueryClientSync", + "QuerySessionSync", +] + +import logging + +from .base import ( + IQueryClient, + SupportedDriverType, + QueryClientSettings, +) + +from .session import QuerySessionSync + +from .._grpc.grpcwrapper.ydb_query_public_types import ( + QueryOnlineReadOnly, + QuerySerializableReadWrite, + QuerySnapshotReadOnly, + QueryStaleReadOnly, +) + +from .pool import QuerySessionPool + +logger = logging.getLogger(__name__) + + +class QueryClientSync(IQueryClient): + def __init__(self, driver: SupportedDriverType, query_client_settings: QueryClientSettings = None): + logger.warning("QueryClientSync is an experimental API, which could be changed.") + self._driver = driver + self._settings = query_client_settings + + def session(self) -> QuerySessionSync: + return QuerySessionSync(self._driver, self._settings) diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py new file mode 100644 index 0000000000..e08d9f52d0 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -0,0 +1,388 @@ +import abc +import enum +import functools + +from typing import ( + Iterator, + Optional, +) + +from .._grpc.grpcwrapper.common_utils import ( + SupportedDriverType, +) +from .._grpc.grpcwrapper import ydb_query +from .._grpc.grpcwrapper.ydb_query_public_types import ( + BaseQueryTxMode, +) +from ..connection import _RpcState as RpcState +from .. import convert +from .. import issues +from .. import _utilities +from .. import _apis + + +class QuerySyntax(enum.IntEnum): + UNSPECIFIED = 0 + YQL_V1 = 1 + PG = 2 + + +class QueryExecMode(enum.IntEnum): + UNSPECIFIED = 0 + PARSE = 10 + VALIDATE = 20 + EXPLAIN = 30 + EXECUTE = 50 + + +class StatsMode(enum.IntEnum): + UNSPECIFIED = 0 + NONE = 10 + BASIC = 20 + FULL = 30 + PROFILE = 40 + + +class SyncResponseContextIterator(_utilities.SyncResponseIterator): + def __enter__(self) -> "SyncResponseContextIterator": + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for _ in self: + pass + + +class QueryClientSettings: + pass + + +class IQuerySessionState(abc.ABC): + def __init__(self, settings: Optional[QueryClientSettings] = None): + pass + + @abc.abstractmethod + def reset(self) -> None: + pass + + @property + @abc.abstractmethod + def session_id(self) -> Optional[str]: + pass + + @abc.abstractmethod + def set_session_id(self, session_id: str) -> "IQuerySessionState": + pass + + @property + @abc.abstractmethod + def node_id(self) -> Optional[int]: + pass + + @abc.abstractmethod + def set_node_id(self, node_id: int) -> "IQuerySessionState": + pass + + @property + @abc.abstractmethod + def attached(self) -> bool: + pass + + @abc.abstractmethod + def set_attached(self, attached: bool) -> "IQuerySessionState": + pass + + +class IQuerySession(abc.ABC): + """Session object for Query Service. It is not recommended to control + session's lifecycle manually - use a QuerySessionPool is always a better choise. + """ + + @abc.abstractmethod + def __init__(self, driver: SupportedDriverType, settings: Optional[QueryClientSettings] = None): + pass + + @abc.abstractmethod + def create(self) -> "IQuerySession": + """WARNING: This API is experimental and could be changed. + + Creates a Session of Query Service on server side and attaches it. + + :return: Session object. + """ + pass + + @abc.abstractmethod + def delete(self) -> None: + """WARNING: This API is experimental and could be changed. + + Deletes a Session of Query Service on server side and releases resources. + + :return: None + """ + pass + + @abc.abstractmethod + def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxContext": + """WARNING: This API is experimental and could be changed. + + Creates a transaction context manager with specified transaction mode. + + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + + :return: transaction context manager. + """ + pass + + @abc.abstractmethod + def execute( + self, + query: str, + syntax: Optional[QuerySyntax] = None, + exec_mode: Optional[QueryExecMode] = None, + parameters: Optional[dict] = None, + concurrent_result_sets: Optional[bool] = False, + ) -> Iterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param parameters: dict with parameters and YDB types; + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + + +class IQueryTxContext(abc.ABC): + """ + An object that provides a simple transaction context manager that allows statements execution + in a transaction. You don't have to open transaction explicitly, because context manager encapsulates + transaction control logic, and opens new transaction if: + 1) By explicit .begin(); + 2) On execution of a first statement, which is strictly recommended method, because that avoids + useless round trip + + This context manager is not thread-safe, so you should not manipulate on it concurrently. + """ + + @abc.abstractmethod + def __init__( + self, + driver: SupportedDriverType, + session_state: IQuerySessionState, + session: IQuerySession, + tx_mode: BaseQueryTxMode, + ): + """ + An object that provides a simple transaction context manager that allows statements execution + in a transaction. You don't have to open transaction explicitly, because context manager encapsulates + transaction control logic, and opens new transaction if: + + 1) By explicit .begin() method; + 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip + + This context manager is not thread-safe, so you should not manipulate on it concurrently. + + :param driver: A driver instance + :param session_state: A state of session + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + """ + pass + + @abc.abstractmethod + def __enter__(self) -> "IQueryTxContext": + """ + Enters a context manager and returns a transaction + + :return: A transaction instance + """ + pass + + @abc.abstractmethod + def __exit__(self, *args, **kwargs): + """ + Closes a transaction context manager and rollbacks transaction if + it is not finished explicitly + """ + pass + + @property + @abc.abstractmethod + def session_id(self) -> str: + """ + A transaction's session id + + :return: A transaction's session id + """ + pass + + @property + @abc.abstractmethod + def tx_id(self) -> Optional[str]: + """ + Returns an id of open transaction or None otherwise + + :return: An id of open transaction or None otherwise + """ + pass + + @abc.abstractmethod + def begin(self, settings: Optional[QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Explicitly begins a transaction + + :param settings: A request settings + + :return: None or exception if begin is failed + """ + pass + + @abc.abstractmethod + def commit(self, settings: Optional[QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls commit on a transaction if it is open. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: None or exception if commit is failed + """ + pass + + @abc.abstractmethod + def rollback(self, settings: Optional[QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls rollback on a transaction if it is open. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: None or exception if rollback is failed + """ + pass + + @abc.abstractmethod + def execute( + self, + query: str, + commit_tx: Optional[bool] = False, + syntax: Optional[QuerySyntax] = None, + exec_mode: Optional[QueryExecMode] = None, + parameters: Optional[dict] = None, + concurrent_result_sets: Optional[bool] = False, + ) -> Iterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param commit_tx: A special flag that allows transaction commit. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param exec_mode: Exec mode of the query, which is a one from the following choises: + 1) QueryExecMode.EXECUTE, which is default; + 2) QueryExecMode.EXPLAIN; + 3) QueryExecMode.VALIDATE; + 4) QueryExecMode.PARSE. + :param parameters: dict with parameters and YDB types; + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + pass + + +class IQueryClient(abc.ABC): + def __init__(self, driver: SupportedDriverType, query_client_settings: Optional[QueryClientSettings] = None): + pass + + @abc.abstractmethod + def session(self) -> IQuerySession: + pass + + +def create_execute_query_request( + query: str, + session_id: str, + tx_id: Optional[str], + commit_tx: Optional[bool], + tx_mode: Optional[BaseQueryTxMode], + syntax: Optional[QuerySyntax], + exec_mode: Optional[QueryExecMode], + parameters: Optional[dict], + concurrent_result_sets: Optional[bool], +) -> ydb_query.ExecuteQueryRequest: + syntax = QuerySyntax.YQL_V1 if not syntax else syntax + exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode + stats_mode = StatsMode.NONE # TODO: choise is not supported yet + + tx_control = None + if not tx_id and not tx_mode: + tx_control = None + elif tx_id: + tx_control = ydb_query.TransactionControl( + tx_id=tx_id, + commit_tx=commit_tx, + begin_tx=None, + ) + else: + tx_control = ydb_query.TransactionControl( + begin_tx=ydb_query.TransactionSettings( + tx_mode=tx_mode, + ), + commit_tx=commit_tx, + tx_id=None, + ) + + return ydb_query.ExecuteQueryRequest( + session_id=session_id, + query_content=ydb_query.QueryContent.from_public( + query=query, + syntax=syntax, + ), + tx_control=tx_control, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + stats_mode=stats_mode, + ) + + +def wrap_execute_query_response( + rpc_state: RpcState, + response_pb: _apis.ydb_query.ExecuteQueryResponsePart, + tx: Optional[IQueryTxContext] = None, + commit_tx: Optional[bool] = False, +) -> convert.ResultSet: + issues._process_response(response_pb) + if tx and response_pb.tx_meta and not tx.tx_id: + tx._move_to_beginned(response_pb.tx_meta.id) + if tx and commit_tx: + tx._move_to_commited() + return convert.ResultSet.from_message(response_pb.result_set) + + +def bad_session_handler(func): + @functools.wraps(func) + def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs): + try: + return func(rpc_state, response_pb, session_state, *args, **kwargs) + except issues.BadSession: + session_state.reset() + raise + + return decorator diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py new file mode 100644 index 0000000000..e7514cdf76 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -0,0 +1,91 @@ +import logging +from typing import ( + Callable, + Optional, + List, +) + +from . import base +from .session import ( + QuerySessionSync, +) +from ..retries import ( + RetrySettings, + retry_operation_sync, +) +from .. import convert + +logger = logging.getLogger(__name__) + + +class QuerySessionPool: + """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" + + def __init__(self, driver: base.SupportedDriverType): + """ + :param driver: A driver instance + """ + + logger.warning("QuerySessionPool is an experimental API, which could be changed.") + self._driver = driver + + def checkout(self) -> "SimpleQuerySessionCheckout": + """WARNING: This API is experimental and could be changed. + Return a Session context manager, that opens session on enter and closes session on exit. + """ + + return SimpleQuerySessionCheckout(self) + + def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs): + """WARNING: This API is experimental and could be changed. + Special interface to execute a bunch of commands with session in a safe, retriable way. + + :param callee: A function, that works with session. + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + def wrapped_callee(): + with self.checkout() as session: + return callee(session, *args, **kwargs) + + return retry_operation_sync(wrapped_callee, retry_settings) + + def execute_with_retries( + self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + ) -> List[convert.ResultSet]: + """WARNING: This API is experimental and could be changed. + Special interface to execute a one-shot queries in a safe, retriable way. + Note: this method loads all data from stream before return, do not use this + method with huge read queries. + + :param query: A query, yql or sql text. + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + def wrapped_callee(): + with self.checkout() as session: + it = session.execute(query, *args, **kwargs) + return [result_set for result_set in it] + + return retry_operation_sync(wrapped_callee, retry_settings) + + +class SimpleQuerySessionCheckout: + def __init__(self, pool: QuerySessionPool): + self._pool = pool + self._session = QuerySessionSync(pool._driver) + + def __enter__(self) -> base.IQuerySession: + self._session.create() + return self._session + + def __exit__(self, exc_type, exc_val, exc_tb): + self._session.delete() diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py new file mode 100644 index 0000000000..d6034d348a --- /dev/null +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -0,0 +1,317 @@ +import abc +import enum +import logging +import threading +from typing import ( + Iterable, + Optional, +) + +from . import base + +from .. import _apis, issues, _utilities +from ..connection import _RpcState as RpcState +from .._grpc.grpcwrapper import common_utils +from .._grpc.grpcwrapper import ydb_query as _ydb_query +from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public + +from .transaction import BaseQueryTxContext + + +logger = logging.getLogger(__name__) + + +class QuerySessionStateEnum(enum.Enum): + NOT_INITIALIZED = "NOT_INITIALIZED" + CREATED = "CREATED" + CLOSED = "CLOSED" + + +class QuerySessionStateHelper(abc.ABC): + _VALID_TRANSITIONS = { + QuerySessionStateEnum.NOT_INITIALIZED: [QuerySessionStateEnum.CREATED], + QuerySessionStateEnum.CREATED: [QuerySessionStateEnum.CLOSED], + QuerySessionStateEnum.CLOSED: [], + } + + _READY_TO_USE = [ + QuerySessionStateEnum.CREATED, + ] + + @classmethod + def valid_transition(cls, before: QuerySessionStateEnum, after: QuerySessionStateEnum) -> bool: + return after in cls._VALID_TRANSITIONS[before] + + @classmethod + def ready_to_use(cls, state: QuerySessionStateEnum) -> bool: + return state in cls._READY_TO_USE + + +class QuerySessionState(base.IQuerySessionState): + _session_id: Optional[str] = None + _node_id: Optional[int] = None + _attached: bool = False + _settings: Optional[base.QueryClientSettings] = None + _state: QuerySessionStateEnum = QuerySessionStateEnum.NOT_INITIALIZED + + def __init__(self, settings: base.QueryClientSettings = None): + self._settings = settings + + def reset(self) -> None: + self._session_id = None + self._node_id = None + self._attached = False + + @property + def session_id(self) -> Optional[str]: + return self._session_id + + def set_session_id(self, session_id: str) -> "QuerySessionState": + self._session_id = session_id + return self + + @property + def node_id(self) -> Optional[int]: + return self._node_id + + def set_node_id(self, node_id: int) -> "QuerySessionState": + self._node_id = node_id + return self + + @property + def attached(self) -> bool: + return self._attached + + def set_attached(self, attached: bool) -> "QuerySessionState": + self._attached = attached + + def _check_invalid_transition(self, target: QuerySessionStateEnum) -> None: + if not QuerySessionStateHelper.valid_transition(self._state, target): + raise RuntimeError(f"Session could not be moved from {self._state.value} to {target.value}") + + def _change_state(self, target: QuerySessionStateEnum) -> None: + self._check_invalid_transition(target) + self._state = target + + def _check_session_ready_to_use(self) -> None: + if not QuerySessionStateHelper.ready_to_use(self._state): + raise RuntimeError(f"Session is not ready to use, current state: {self._state.value}") + + def _already_in(self, target) -> bool: + return self._state == target + + +def wrapper_create_session( + rpc_state: RpcState, + response_pb: _apis.ydb_query.CreateSessionResponse, + session_state: QuerySessionState, + session: "BaseQuerySession", +) -> "BaseQuerySession": + message = _ydb_query.CreateSessionResponse.from_proto(response_pb) + issues._process_response(message.status) + session_state.set_session_id(message.session_id).set_node_id(message.node_id) + return session + + +def wrapper_delete_session( + rpc_state: RpcState, + response_pb: _apis.ydb_query.DeleteSessionResponse, + session_state: QuerySessionState, + session: "BaseQuerySession", +) -> "BaseQuerySession": + message = _ydb_query.DeleteSessionResponse.from_proto(response_pb) + issues._process_response(message.status) + session_state.reset() + session_state._change_state(QuerySessionStateEnum.CLOSED) + return session + + +class BaseQuerySession(base.IQuerySession): + _driver: base.SupportedDriverType + _settings: base.QueryClientSettings + _state: QuerySessionState + + def __init__(self, driver: base.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None): + self._driver = driver + self._settings = settings if settings is not None else base.QueryClientSettings() + self._state = QuerySessionState(settings) + + def _create_call(self) -> "BaseQuerySession": + return self._driver( + _apis.ydb_query.CreateSessionRequest(), + _apis.QueryService.Stub, + _apis.QueryService.CreateSession, + wrap_result=wrapper_create_session, + wrap_args=(self._state, self), + ) + + def _delete_call(self) -> "BaseQuerySession": + return self._driver( + _apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id), + _apis.QueryService.Stub, + _apis.QueryService.DeleteSession, + wrap_result=wrapper_delete_session, + wrap_args=(self._state, self), + ) + + def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]: + return self._driver( + _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id), + _apis.QueryService.Stub, + _apis.QueryService.AttachSession, + ) + + def _execute_call( + self, + query: str, + commit_tx: bool = False, + syntax: base.QuerySyntax = None, + exec_mode: base.QueryExecMode = None, + parameters: dict = None, + concurrent_result_sets: bool = False, + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + request = base.create_execute_query_request( + query=query, + session_id=self._state.session_id, + commit_tx=commit_tx, + tx_mode=None, + tx_id=None, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + + return self._driver( + request.to_proto(), + _apis.QueryService.Stub, + _apis.QueryService.ExecuteQuery, + ) + + +class QuerySessionSync(BaseQuerySession): + """Session object for Query Service. It is not recommended to control + session's lifecycle manually - use a QuerySessionPool is always a better choise. + """ + + _stream = None + + def _attach(self) -> None: + self._stream = self._attach_call() + status_stream = _utilities.SyncResponseIterator( + self._stream, + lambda response: common_utils.ServerStatus.from_proto(response), + ) + + first_response = next(status_stream) + if first_response.status != issues.StatusCode.SUCCESS: + pass + + self._state.set_attached(True) + self._state._change_state(QuerySessionStateEnum.CREATED) + + threading.Thread( + target=self._check_session_status_loop, + args=(status_stream,), + name="check session status thread", + daemon=True, + ).start() + + def _check_session_status_loop(self, status_stream: _utilities.SyncResponseIterator) -> None: + try: + for status in status_stream: + if status.status != issues.StatusCode.SUCCESS: + self._state.reset() + self._state._change_state(QuerySessionStateEnum.CLOSED) + except Exception: + pass + + def delete(self) -> None: + """WARNING: This API is experimental and could be changed. + + Deletes a Session of Query Service on server side and releases resources. + + :return: None + """ + if self._state._already_in(QuerySessionStateEnum.CLOSED): + return + + self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) + self._delete_call() + self._stream.cancel() + + def create(self) -> "QuerySessionSync": + """WARNING: This API is experimental and could be changed. + + Creates a Session of Query Service on server side and attaches it. + + :return: QuerySessionSync object. + """ + if self._state._already_in(QuerySessionStateEnum.CREATED): + return + + self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) + self._create_call() + self._attach() + + return self + + def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQueryTxContext: + """WARNING: This API is experimental and could be changed. + + Creates a transaction context manager with specified transaction mode. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + + :return transaction context manager. + + """ + self._state._check_session_ready_to_use() + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + + return BaseQueryTxContext( + self._driver, + self._state, + self, + tx_mode, + ) + + def execute( + self, + query: str, + syntax: base.QuerySyntax = None, + exec_mode: base.QueryExecMode = None, + parameters: dict = None, + concurrent_result_sets: bool = False, + ) -> base.SyncResponseContextIterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param parameters: dict with parameters and YDB types; + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + self._state._check_session_ready_to_use() + + stream_it = self._execute_call( + query=query, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + + return base.SyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp), + ) diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py new file mode 100644 index 0000000000..0a49320293 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -0,0 +1,412 @@ +import abc +import logging +import enum +import functools +from typing import ( + Iterable, + Optional, +) + +from .. import ( + _apis, + issues, +) +from .._grpc.grpcwrapper import ydb_query as _ydb_query +from ..connection import _RpcState as RpcState + +from . import base + +logger = logging.getLogger(__name__) + + +class QueryTxStateEnum(enum.Enum): + NOT_INITIALIZED = "NOT_INITIALIZED" + BEGINED = "BEGINED" + COMMITTED = "COMMITTED" + ROLLBACKED = "ROLLBACKED" + DEAD = "DEAD" + + +class QueryTxStateHelper(abc.ABC): + _VALID_TRANSITIONS = { + QueryTxStateEnum.NOT_INITIALIZED: [ + QueryTxStateEnum.BEGINED, + QueryTxStateEnum.DEAD, + QueryTxStateEnum.COMMITTED, + QueryTxStateEnum.ROLLBACKED, + ], + QueryTxStateEnum.BEGINED: [QueryTxStateEnum.COMMITTED, QueryTxStateEnum.ROLLBACKED, QueryTxStateEnum.DEAD], + QueryTxStateEnum.COMMITTED: [], + QueryTxStateEnum.ROLLBACKED: [], + QueryTxStateEnum.DEAD: [], + } + + @classmethod + def valid_transition(cls, before: QueryTxStateEnum, after: QueryTxStateEnum) -> bool: + return after in cls._VALID_TRANSITIONS[before] + + @classmethod + def terminal(cls, state: QueryTxStateEnum) -> bool: + return len(cls._VALID_TRANSITIONS[state]) == 0 + + +def reset_tx_id_handler(func): + @functools.wraps(func) + def decorator( + rpc_state, response_pb, session_state: base.IQuerySessionState, tx_state: QueryTxState, *args, **kwargs + ): + try: + return func(rpc_state, response_pb, session_state, tx_state, *args, **kwargs) + except issues.Error: + tx_state._change_state(QueryTxStateEnum.DEAD) + tx_state.tx_id = None + raise + + return decorator + + +class QueryTxState: + def __init__(self, tx_mode: base.BaseQueryTxMode): + """ + Holds transaction context manager info + :param tx_mode: A mode of transaction + """ + self.tx_id = None + self.tx_mode = tx_mode + self._state = QueryTxStateEnum.NOT_INITIALIZED + + def _check_invalid_transition(self, target: QueryTxStateEnum) -> None: + if not QueryTxStateHelper.valid_transition(self._state, target): + raise RuntimeError(f"Transaction could not be moved from {self._state.value} to {target.value}") + + def _change_state(self, target: QueryTxStateEnum) -> None: + self._check_invalid_transition(target) + self._state = target + + def _check_tx_ready_to_use(self) -> None: + if QueryTxStateHelper.terminal(self._state): + raise RuntimeError(f"Transaction is in terminal state: {self._state.value}") + + def _already_in(self, target: QueryTxStateEnum) -> bool: + return self._state == target + + +def _construct_tx_settings(tx_state: QueryTxState) -> _ydb_query.TransactionSettings: + tx_settings = _ydb_query.TransactionSettings.from_public(tx_state.tx_mode) + return tx_settings + + +def _create_begin_transaction_request( + session_state: base.IQuerySessionState, tx_state: QueryTxState +) -> _apis.ydb_query.BeginTransactionRequest: + request = _ydb_query.BeginTransactionRequest( + session_id=session_state.session_id, + tx_settings=_construct_tx_settings(tx_state), + ).to_proto() + return request + + +def _create_commit_transaction_request( + session_state: base.IQuerySessionState, tx_state: QueryTxState +) -> _apis.ydb_query.CommitTransactionRequest: + request = _apis.ydb_query.CommitTransactionRequest() + request.tx_id = tx_state.tx_id + request.session_id = session_state.session_id + return request + + +def _create_rollback_transaction_request( + session_state: base.IQuerySessionState, tx_state: QueryTxState +) -> _apis.ydb_query.RollbackTransactionRequest: + request = _apis.ydb_query.RollbackTransactionRequest() + request.tx_id = tx_state.tx_id + request.session_id = session_state.session_id + return request + + +@base.bad_session_handler +def wrap_tx_begin_response( + rpc_state: RpcState, + response_pb: _apis.ydb_query.BeginTransactionResponse, + session_state: base.IQuerySessionState, + tx_state: QueryTxState, + tx: "BaseQueryTxContext", +) -> "BaseQueryTxContext": + message = _ydb_query.BeginTransactionResponse.from_proto(response_pb) + issues._process_response(message.status) + tx_state._change_state(QueryTxStateEnum.BEGINED) + tx_state.tx_id = message.tx_meta.tx_id + return tx + + +@base.bad_session_handler +@reset_tx_id_handler +def wrap_tx_commit_response( + rpc_state: RpcState, + response_pb: _apis.ydb_query.CommitTransactionResponse, + session_state: base.IQuerySessionState, + tx_state: QueryTxState, + tx: "BaseQueryTxContext", +) -> "BaseQueryTxContext": + message = _ydb_query.CommitTransactionResponse.from_proto(response_pb) + issues._process_response(message.status) + tx_state._change_state(QueryTxStateEnum.COMMITTED) + return tx + + +@base.bad_session_handler +@reset_tx_id_handler +def wrap_tx_rollback_response( + rpc_state: RpcState, + response_pb: _apis.ydb_query.RollbackTransactionResponse, + session_state: base.IQuerySessionState, + tx_state: QueryTxState, + tx: "BaseQueryTxContext", +) -> "BaseQueryTxContext": + message = _ydb_query.RollbackTransactionResponse.from_proto(response_pb) + issues._process_response(message.status) + tx_state._change_state(QueryTxStateEnum.ROLLBACKED) + return tx + + +class BaseQueryTxContext(base.IQueryTxContext): + def __init__(self, driver, session_state, session, tx_mode): + """ + An object that provides a simple transaction context manager that allows statements execution + in a transaction. You don't have to open transaction explicitly, because context manager encapsulates + transaction control logic, and opens new transaction if: + + 1) By explicit .begin() method; + 2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip + + This context manager is not thread-safe, so you should not manipulate on it concurrently. + + :param driver: A driver instance + :param session_state: A state of session + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + """ + + self._driver = driver + self._tx_state = QueryTxState(tx_mode) + self._session_state = session_state + self.session = session + self._prev_stream = None + + def __enter__(self) -> "BaseQueryTxContext": + """ + Enters a context manager and returns a transaction + + :return: A transaction instance + """ + return self + + def __exit__(self, *args, **kwargs): + """ + Closes a transaction context manager and rollbacks transaction if + it is not finished explicitly + """ + self._ensure_prev_stream_finished() + if self._tx_state._state == QueryTxStateEnum.BEGINED: + # It's strictly recommended to close transactions directly + # by using commit_tx=True flag while executing statement or by + # .commit() or .rollback() methods, but here we trying to do best + # effort to avoid useless open transactions + logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id) + try: + self.rollback() + except issues.Error: + logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id) + + @property + def session_id(self) -> str: + """ + A transaction's session id + + :return: A transaction's session id + """ + return self._session_state.session_id + + @property + def tx_id(self) -> Optional[str]: + """ + Returns an id of open transaction or None otherwise + + :return: An id of open transaction or None otherwise + """ + return self._tx_state.tx_id + + def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + return self._driver( + _create_begin_transaction_request(self._session_state, self._tx_state), + _apis.QueryService.Stub, + _apis.QueryService.BeginTransaction, + wrap_tx_begin_response, + settings, + (self._session_state, self._tx_state, self), + ) + + def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + return self._driver( + _create_commit_transaction_request(self._session_state, self._tx_state), + _apis.QueryService.Stub, + _apis.QueryService.CommitTransaction, + wrap_tx_commit_response, + settings, + (self._session_state, self._tx_state, self), + ) + + def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + return self._driver( + _create_rollback_transaction_request(self._session_state, self._tx_state), + _apis.QueryService.Stub, + _apis.QueryService.RollbackTransaction, + wrap_tx_rollback_response, + settings, + (self._session_state, self._tx_state, self), + ) + + def _execute_call( + self, + query: str, + commit_tx: bool = False, + syntax: base.QuerySyntax = None, + exec_mode: base.QueryExecMode = None, + parameters: dict = None, + concurrent_result_sets: bool = False, + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + request = base.create_execute_query_request( + query=query, + session_id=self._session_state.session_id, + commit_tx=commit_tx, + tx_id=self._tx_state.tx_id, + tx_mode=self._tx_state.tx_mode, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + + return self._driver( + request.to_proto(), + _apis.QueryService.Stub, + _apis.QueryService.ExecuteQuery, + ) + + def _ensure_prev_stream_finished(self) -> None: + if self._prev_stream is not None: + for _ in self._prev_stream: + pass + self._prev_stream = None + + def _move_to_beginned(self, tx_id: str) -> None: + if self._tx_state._already_in(QueryTxStateEnum.BEGINED): + return + self._tx_state._change_state(QueryTxStateEnum.BEGINED) + self._tx_state.tx_id = tx_id + + def _move_to_commited(self) -> None: + if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + return + self._tx_state._change_state(QueryTxStateEnum.COMMITTED) + + def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Explicitly begins a transaction + + :param settings: A request settings + + :return: None or exception if begin is failed + """ + self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) + + self._begin_call(settings) + + def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + """WARNING: This API is experimental and could be changed. + + Calls commit on a transaction if it is open otherwise is no-op. If transaction execution + failed then this method raises PreconditionFailed. + + :param settings: A request settings + + :return: A committed transaction or exception if commit is failed + """ + if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): + return + self._ensure_prev_stream_finished() + + if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: + self._tx_state._change_state(QueryTxStateEnum.COMMITTED) + return + + self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) + + self._commit_call(settings) + + def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): + return + + self._ensure_prev_stream_finished() + + if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: + self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED) + return + + self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) + + self._rollback_call(settings) + + def execute( + self, + query: str, + commit_tx: Optional[bool] = False, + syntax: Optional[base.QuerySyntax] = None, + exec_mode: Optional[base.QueryExecMode] = None, + parameters: Optional[dict] = None, + concurrent_result_sets: Optional[bool] = False, + ) -> base.SyncResponseContextIterator: + """WARNING: This API is experimental and could be changed. + + Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. + :param commit_tx: A special flag that allows transaction commit. + :param syntax: Syntax of the query, which is a one from the following choises: + 1) QuerySyntax.YQL_V1, which is default; + 2) QuerySyntax.PG. + :param exec_mode: Exec mode of the query, which is a one from the following choises: + 1) QueryExecMode.EXECUTE, which is default; + 2) QueryExecMode.EXPLAIN; + 3) QueryExecMode.VALIDATE; + 4) QueryExecMode.PARSE. + :param parameters: dict with parameters and YDB types; + :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + + :return: Iterator with result sets + """ + self._ensure_prev_stream_finished() + self._tx_state._check_tx_ready_to_use() + + stream_it = self._execute_call( + query=query, + commit_tx=commit_tx, + syntax=syntax, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + ) + self._prev_stream = base.SyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + tx=self, + commit_tx=commit_tx, + ), + ) + return self._prev_stream diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py new file mode 100644 index 0000000000..5d4f6e6a0f --- /dev/null +++ b/contrib/python/ydb/py3/ydb/retries.py @@ -0,0 +1,136 @@ +import random +import time + +from . import issues +from ._errors import check_retriable_error + + +class BackoffSettings(object): + def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5): + self.ceiling = ceiling + self.slot_duration = slot_duration + self.uncertain_ratio = uncertain_ratio + + def calc_timeout(self, retry_number): + slots_count = 1 << min(retry_number, self.ceiling) + max_duration_ms = slots_count * self.slot_duration * 1000.0 + # duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio) + duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio) + return duration_ms / 1000.0 + + +class RetrySettings(object): + def __init__( + self, + max_retries=10, + max_session_acquire_timeout=None, + on_ydb_error_callback=None, + backoff_ceiling=6, + backoff_slot_duration=1, + get_session_client_timeout=5, + fast_backoff_settings=None, + slow_backoff_settings=None, + idempotent=False, + ): + self.max_retries = max_retries + self.max_session_acquire_timeout = max_session_acquire_timeout + self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback + self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings + self.slow_backoff = ( + BackoffSettings(backoff_ceiling, backoff_slot_duration) + if slow_backoff_settings is None + else slow_backoff_settings + ) + self.retry_not_found = True + self.idempotent = idempotent + self.retry_internal_error = True + self.unknown_error_handler = lambda e: None + self.get_session_client_timeout = get_session_client_timeout + if max_session_acquire_timeout is not None: + self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout) + + def with_fast_backoff(self, backoff_settings): + self.fast_backoff = backoff_settings + return self + + def with_slow_backoff(self, backoff_settings): + self.slow_backoff = backoff_settings + return self + + +class YdbRetryOperationSleepOpt(object): + def __init__(self, timeout): + self.timeout = timeout + + def __eq__(self, other): + return type(self) == type(other) and self.timeout == other.timeout + + def __repr__(self): + return "YdbRetryOperationSleepOpt(%s)" % self.timeout + + +class YdbRetryOperationFinalResult(object): + def __init__(self, result): + self.result = result + self.exc = None + + def __eq__(self, other): + return type(self) == type(other) and self.result == other.result and self.exc == other.exc + + def __repr__(self): + return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc) + + def set_exception(self, exc): + self.exc = exc + + +def retry_operation_impl(callee, retry_settings=None, *args, **kwargs): + retry_settings = RetrySettings() if retry_settings is None else retry_settings + status = None + + for attempt in range(retry_settings.max_retries + 1): + try: + result = YdbRetryOperationFinalResult(callee(*args, **kwargs)) + yield result + + if result.exc is not None: + raise result.exc + + except issues.Error as e: + status = e + retry_settings.on_ydb_error_callback(e) + + retriable_info = check_retriable_error(e, retry_settings, attempt) + if not retriable_info.is_retriable: + raise + + skip_yield_error_types = [ + issues.Aborted, + issues.BadSession, + issues.NotFound, + issues.InternalError, + ] + + yield_sleep = True + for t in skip_yield_error_types: + if isinstance(e, t): + yield_sleep = False + + if yield_sleep: + yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) + + except Exception as e: + # you should provide your own handler you want + retry_settings.unknown_error_handler(e) + raise + + raise status + + +def retry_operation_sync(callee, retry_settings=None, *args, **kwargs): + opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) + for next_opt in opt_generator: + if isinstance(next_opt, YdbRetryOperationSleepOpt): + time.sleep(next_opt.timeout) + else: + return next_opt.result diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py index c21392bb4c..ac9f93042c 100644 --- a/contrib/python/ydb/py3/ydb/table.py +++ b/contrib/python/ydb/py3/ydb/table.py @@ -3,8 +3,6 @@ import abc import ydb from abc import abstractmethod import logging -import time -import random import enum from . import ( @@ -20,7 +18,15 @@ from . import ( _tx_ctx_impl, tracing, ) -from ._errors import check_retriable_error + +from .retries import ( + YdbRetryOperationFinalResult, # noqa + YdbRetryOperationSleepOpt, # noqa + BackoffSettings, # noqa + retry_operation_impl, # noqa + RetrySettings, + retry_operation_sync, +) try: from . import interceptor @@ -840,137 +846,6 @@ class StaleReadOnly(AbstractTransactionModeBuilder): return self._name -class BackoffSettings(object): - def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5): - self.ceiling = ceiling - self.slot_duration = slot_duration - self.uncertain_ratio = uncertain_ratio - - def calc_timeout(self, retry_number): - slots_count = 1 << min(retry_number, self.ceiling) - max_duration_ms = slots_count * self.slot_duration * 1000.0 - # duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio) - duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio) - return duration_ms / 1000.0 - - -class RetrySettings(object): - def __init__( - self, - max_retries=10, - max_session_acquire_timeout=None, - on_ydb_error_callback=None, - backoff_ceiling=6, - backoff_slot_duration=1, - get_session_client_timeout=5, - fast_backoff_settings=None, - slow_backoff_settings=None, - idempotent=False, - ): - self.max_retries = max_retries - self.max_session_acquire_timeout = max_session_acquire_timeout - self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback - self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings - self.slow_backoff = ( - BackoffSettings(backoff_ceiling, backoff_slot_duration) - if slow_backoff_settings is None - else slow_backoff_settings - ) - self.retry_not_found = True - self.idempotent = idempotent - self.retry_internal_error = True - self.unknown_error_handler = lambda e: None - self.get_session_client_timeout = get_session_client_timeout - if max_session_acquire_timeout is not None: - self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout) - - def with_fast_backoff(self, backoff_settings): - self.fast_backoff = backoff_settings - return self - - def with_slow_backoff(self, backoff_settings): - self.slow_backoff = backoff_settings - return self - - -class YdbRetryOperationSleepOpt(object): - def __init__(self, timeout): - self.timeout = timeout - - def __eq__(self, other): - return type(self) == type(other) and self.timeout == other.timeout - - def __repr__(self): - return "YdbRetryOperationSleepOpt(%s)" % self.timeout - - -class YdbRetryOperationFinalResult(object): - def __init__(self, result): - self.result = result - self.exc = None - - def __eq__(self, other): - return type(self) == type(other) and self.result == other.result and self.exc == other.exc - - def __repr__(self): - return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc) - - def set_exception(self, exc): - self.exc = exc - - -def retry_operation_impl(callee, retry_settings=None, *args, **kwargs): - retry_settings = RetrySettings() if retry_settings is None else retry_settings - status = None - - for attempt in range(retry_settings.max_retries + 1): - try: - result = YdbRetryOperationFinalResult(callee(*args, **kwargs)) - yield result - - if result.exc is not None: - raise result.exc - - except issues.Error as e: - status = e - retry_settings.on_ydb_error_callback(e) - - retriable_info = check_retriable_error(e, retry_settings, attempt) - if not retriable_info.is_retriable: - raise - - skip_yield_error_types = [ - issues.Aborted, - issues.BadSession, - issues.NotFound, - issues.InternalError, - ] - - yield_sleep = True - for t in skip_yield_error_types: - if isinstance(e, t): - yield_sleep = False - - if yield_sleep: - yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) - - except Exception as e: - # you should provide your own handler you want - retry_settings.unknown_error_handler(e) - raise - - raise status - - -def retry_operation_sync(callee, retry_settings=None, *args, **kwargs): - opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) - for next_opt in opt_generator: - if isinstance(next_opt, YdbRetryOperationSleepOpt): - time.sleep(next_opt.timeout) - else: - return next_opt.result - - class TableClientSettings(object): def __init__(self): self._client_query_cache_enabled = False diff --git a/contrib/python/ydb/py3/ydb/types.py b/contrib/python/ydb/py3/ydb/types.py index 49792ed39e..a48548640c 100644 --- a/contrib/python/ydb/py3/ydb/types.py +++ b/contrib/python/ydb/py3/ydb/types.py @@ -2,6 +2,7 @@ from __future__ import annotations import abc +from dataclasses import dataclass import enum import json from . import _utilities, _apis @@ -440,3 +441,9 @@ class BulkUpsertColumns(AbstractTypeBuilder): def __str__(self): return "BulkUpsertColumns<%s>" % ",".join(self.__columns_repr) + + +@dataclass +class TypedValue: + value: typing.Any + value_type: typing.Optional[typing.Union[PrimitiveType, AbstractTypeBuilder]] = None diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 2edc0629d7..567cda12c7 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.13.0" +VERSION = "3.15.0" |