diff options
author | rekby <rekby@ydb.tech> | 2023-03-17 12:21:37 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2023-03-17 12:21:37 +0300 |
commit | 2bd3a83a0964ef79798bc5a756c4d94afb6dd4e4 (patch) | |
tree | 94bf4b0bdfa4e084a36722a8f7627376d8741318 | |
parent | 781b388f6f446cefcaa7fa4d787342b56e584f8d (diff) | |
download | ydb-2bd3a83a0964ef79798bc5a756c4d94afb6dd4e4.tar.gz |
Sync ydb python sdk from git 2.13.2
23 files changed, 579 insertions, 93 deletions
diff --git a/ydb/public/sdk/python3/README.md b/ydb/public/sdk/python3/README.md new file mode 100644 index 0000000000..cfc57eb276 --- /dev/null +++ b/ydb/public/sdk/python3/README.md @@ -0,0 +1,37 @@ +YDB Python SDK +--- +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/ydb-platform/ydb/blob/main/LICENSE) +[![PyPI version](https://badge.fury.io/py/ydb.svg)](https://badge.fury.io/py/ydb) +[![Functional tests](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml) +[![Style checks](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml) + +Officially supported Python client for YDB. + +## Quickstart + +### Prerequisites + +- Python 3.8 or higher +- `pip` version 9.0.1 or higher + +If necessary, upgrade your version of `pip`: + +```sh +$ python -m pip install --upgrade pip +``` + +If you cannot upgrade `pip` due to a system-owned installation, you can +run the example in a virtualenv: + +```sh +$ python -m pip install virtualenv +$ virtualenv venv +$ source venv/bin/activate +$ python -m pip install --upgrade pip +``` + +Install YDB python sdk: + +```sh +$ python -m pip install ydb +``` diff --git a/ydb/public/sdk/python3/ya.make b/ydb/public/sdk/python3/ya.make index 6b777ed881..c91708c8d7 100644 --- a/ydb/public/sdk/python3/ya.make +++ b/ydb/public/sdk/python3/ya.make @@ -4,6 +4,9 @@ PY_SRCS( TOP_LEVEL ydb/__init__.py ydb/_apis.py + ydb/_errors.py + ydb/_grpc/__init__.py + ydb/_grpc/common/__init__.py ydb/_session_impl.py ydb/_sp_impl.py ydb/_tx_ctx_impl.py @@ -29,6 +32,7 @@ PY_SRCS( ydb/default_pem.py ydb/driver.py ydb/export.py + ydb/global_settings.py ydb/iam/__init__.py ydb/iam/auth.py ydb/import_client.py @@ -38,11 +42,13 @@ PY_SRCS( ydb/pool.py ydb/resolver.py ydb/scheme.py + ydb/scheme_test.py ydb/scripting.py ydb/settings.py ydb/sqlalchemy/__init__.py ydb/sqlalchemy/types.py ydb/table.py + ydb/table_test.py ydb/tornado/__init__.py ydb/tornado/tornado_helpers.py ydb/tracing.py diff --git a/ydb/public/sdk/python3/ydb/__init__.py b/ydb/public/sdk/python3/ydb/__init__.py index d8c23fee4c..56b73478eb 100644 --- a/ydb/public/sdk/python3/ydb/__init__.py +++ b/ydb/public/sdk/python3/ydb/__init__.py @@ -1,5 +1,6 @@ from .credentials import * # noqa from .driver import * # noqa +from .global_settings import * # noqa from .table import * # noqa from .issues import * # noqa from .types import * # noqa diff --git a/ydb/public/sdk/python3/ydb/_apis.py b/ydb/public/sdk/python3/ydb/_apis.py index aa496d6b29..6f2fc3ab6a 100644 --- a/ydb/public/sdk/python3/ydb/_apis.py +++ b/ydb/public/sdk/python3/ydb/_apis.py @@ -1,21 +1,42 @@ # -*- coding: utf-8 -*- -from ydb.public.api.grpc import ( - ydb_cms_v1_pb2_grpc, - ydb_discovery_v1_pb2_grpc, - ydb_scheme_v1_pb2_grpc, - ydb_table_v1_pb2_grpc, -) -from ydb.public.api.protos import ( - ydb_status_codes_pb2, - ydb_discovery_pb2, - ydb_scheme_pb2, - ydb_table_pb2, - ydb_value_pb2, -) -from ydb.public.api.protos import ydb_operation_pb2 -from ydb.public.api.protos import ydb_common_pb2 -from ydb.public.api.grpc import ydb_operation_v1_pb2_grpc +# Workaround for good IDE and universal for runtime +# noinspection PyUnreachableCode +if False: + from ._grpc.v4 import ( + ydb_cms_v1_pb2_grpc, + ydb_discovery_v1_pb2_grpc, + ydb_scheme_v1_pb2_grpc, + ydb_table_v1_pb2_grpc, + ydb_operation_v1_pb2_grpc, + ) + from ._grpc.v4.protos import ( + ydb_status_codes_pb2, + ydb_discovery_pb2, + ydb_scheme_pb2, + ydb_table_pb2, + ydb_value_pb2, + ydb_operation_pb2, + ydb_common_pb2, + ) +else: + from ._grpc.common import ( + ydb_cms_v1_pb2_grpc, + ydb_discovery_v1_pb2_grpc, + ydb_scheme_v1_pb2_grpc, + ydb_table_v1_pb2_grpc, + ydb_operation_v1_pb2_grpc, + ) + + from ._grpc.common.protos import ( + ydb_status_codes_pb2, + ydb_discovery_pb2, + ydb_scheme_pb2, + ydb_table_pb2, + ydb_value_pb2, + ydb_operation_pb2, + ydb_common_pb2, + ) StatusIds = ydb_status_codes_pb2.StatusIds FeatureFlag = ydb_common_pb2.FeatureFlag diff --git a/ydb/public/sdk/python3/ydb/_errors.py b/ydb/public/sdk/python3/ydb/_errors.py new file mode 100644 index 0000000000..ae3057b6d2 --- /dev/null +++ b/ydb/public/sdk/python3/ydb/_errors.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from typing import Optional + +from ydb import issues + +_errors_retriable_fast_backoff_types = [ + issues.Unavailable, +] +_errors_retriable_slow_backoff_types = [ + issues.Aborted, + issues.BadSession, + issues.Overloaded, + issues.SessionPoolEmpty, + issues.ConnectionError, +] +_errors_retriable_slow_backoff_idempotent_types = [ + issues.Undetermined, +] + + +def check_retriable_error(err, retry_settings, attempt): + if isinstance(err, issues.NotFound): + if retry_settings.retry_not_found: + return ErrorRetryInfo( + True, retry_settings.fast_backoff.calc_timeout(attempt) + ) + else: + return ErrorRetryInfo(False, None) + + if isinstance(err, issues.InternalError): + if retry_settings.retry_internal_error: + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + else: + return ErrorRetryInfo(False, None) + + for t in _errors_retriable_fast_backoff_types: + if isinstance(err, t): + return ErrorRetryInfo( + True, retry_settings.fast_backoff.calc_timeout(attempt) + ) + + for t in _errors_retriable_slow_backoff_types: + if isinstance(err, t): + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + + if retry_settings.idempotent: + for t in _errors_retriable_slow_backoff_idempotent_types: + if isinstance(err, t): + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + + return ErrorRetryInfo(False, None) + + +@dataclass +class ErrorRetryInfo: + is_retriable: bool + sleep_timeout_seconds: Optional[float] diff --git a/ydb/public/sdk/python3/ydb/_grpc/__init__.py b/ydb/public/sdk/python3/ydb/_grpc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/ydb/public/sdk/python3/ydb/_grpc/__init__.py diff --git a/ydb/public/sdk/python3/ydb/_grpc/common/__init__.py b/ydb/public/sdk/python3/ydb/_grpc/common/__init__.py new file mode 100644 index 0000000000..689535172c --- /dev/null +++ b/ydb/public/sdk/python3/ydb/_grpc/common/__init__.py @@ -0,0 +1,6 @@ +import sys + +from ydb.public.api.grpc import * # noqa +sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"] +from ydb.public.api import protos # noqa +sys.modules["ydb._grpc.common.protos"] = sys.modules["ydb.public.api.protos"] diff --git a/ydb/public/sdk/python3/ydb/aio/iam.py b/ydb/public/sdk/python3/ydb/aio/iam.py index a2da48b782..51b650f24b 100644 --- a/ydb/public/sdk/python3/ydb/aio/iam.py +++ b/ydb/public/sdk/python3/ydb/aio/iam.py @@ -60,6 +60,11 @@ class TokenServiceCredentials(AbstractExpiringTokenCredentials): return {"access_token": response.iam_token, "expires_in": expires_in} +# IamTokenCredentials need for backward compatibility +# Deprecated +IamTokenCredentials = TokenServiceCredentials + + class JWTIamCredentials(TokenServiceCredentials, auth.BaseJWTCredentials): def __init__( self, @@ -121,10 +126,13 @@ class MetadataUrlCredentials(AbstractExpiringTokenCredentials): ) as response: if not response.ok: self.logger.error( - "Error while getting token from metadata: %s" % response.text() + "Error while getting token from metadata: %s" + % await response.text() ) response.raise_for_status() - return await response.json() + # response from default metadata credentials provider + # contains text/plain content type. + return await response.json(content_type=None) class ServiceAccountCredentials(JWTIamCredentials): diff --git a/ydb/public/sdk/python3/ydb/aio/table.py b/ydb/public/sdk/python3/ydb/aio/table.py index 9df797eaf0..f937a9283c 100644 --- a/ydb/public/sdk/python3/ydb/aio/table.py +++ b/ydb/public/sdk/python3/ydb/aio/table.py @@ -13,6 +13,7 @@ from ydb.table import ( _scan_query_request_factory, _wrap_scan_query_response, BaseTxContext, + _allow_split_transaction, ) from . import _utilities from ydb import _apis, _session_impl @@ -120,8 +121,16 @@ class Session(BaseSession): set_read_replicas_settings, ) - def transaction(self, tx_mode=None): - return TxContext(self._driver, self._state, self, tx_mode) + def transaction( + self, tx_mode=None, *, allow_split_transactions=_allow_split_transaction + ): + return TxContext( + self._driver, + self._state, + self, + tx_mode, + allow_split_transactions=allow_split_transactions, + ) async def describe_table(self, path, settings=None): # pylint: disable=W0236 return await super().describe_table(path, settings) @@ -184,6 +193,9 @@ class TxContext(BaseTxContext): async def execute( self, query, parameters=None, commit_tx=False, settings=None ): # pylint: disable=W0236 + + self._check_split() + return await super().execute(query, parameters, commit_tx, settings) async def commit(self, settings=None): # pylint: disable=W0236 diff --git a/ydb/public/sdk/python3/ydb/convert.py b/ydb/public/sdk/python3/ydb/convert.py index cccb5641e1..b231bb1091 100644 --- a/ydb/public/sdk/python3/ydb/convert.py +++ b/ydb/public/sdk/python3/ydb/convert.py @@ -13,6 +13,7 @@ _DecimalNanRepr = 10**35 + 1 _DecimalInfRepr = 10**35 _DecimalSignedInfRepr = -(10**35) _primitive_type_by_id = {} +_default_allow_truncated_result = True def _initialize(): @@ -214,9 +215,10 @@ def _dict_to_pb(type_pb, value): for key, payload in value.items(): kv_pair = value_pb.pairs.add() kv_pair.key.MergeFrom(_from_native_value(type_pb.dict_type.key, key)) - kv_pair.payload.MergeFrom( - _from_native_value(type_pb.dict_type.payload, payload) - ) + if payload: + kv_pair.payload.MergeFrom( + _from_native_value(type_pb.dict_type.payload, payload) + ) return value_pb @@ -491,10 +493,20 @@ class ResultSets(list): if table_client_settings is None else table_client_settings._make_result_sets_lazy ) + + allow_truncated_result = _default_allow_truncated_result + if table_client_settings: + allow_truncated_result = table_client_settings._allow_truncated_result + result_sets = [] initializer = ( _ResultSet.from_message if not make_lazy else _ResultSet.lazy_from_message ) for result_set in result_sets_pb: - result_sets.append(initializer(result_set, table_client_settings)) + result_set = initializer(result_set, table_client_settings) + if result_set.truncated and not allow_truncated_result: + raise issues.TruncatedResponseError( + "Response for the request was truncated by server" + ) + result_sets.append(result_set) super(ResultSets, self).__init__(result_sets) diff --git a/ydb/public/sdk/python3/ydb/credentials.py b/ydb/public/sdk/python3/ydb/credentials.py index 96247ef32e..8e22fe2a84 100644 --- a/ydb/public/sdk/python3/ydb/credentials.py +++ b/ydb/public/sdk/python3/ydb/credentials.py @@ -7,8 +7,15 @@ import threading from concurrent import futures import logging import time -from ydb.public.api.protos import ydb_auth_pb2 -from ydb.public.api.grpc import ydb_auth_v1_pb2_grpc + +# Workaround for good IDE and universal for runtime +# noinspection PyUnreachableCode +if False: + from ._grpc.v4.protos import ydb_auth_pb2 + from ._grpc.v4 import ydb_auth_v1_pb2_grpc +else: + from ._grpc.common.protos import ydb_auth_pb2 + from ._grpc.common import ydb_auth_v1_pb2_grpc YDB_AUTH_TICKET_HEADER = "x-ydb-auth-ticket" diff --git a/ydb/public/sdk/python3/ydb/export.py b/ydb/public/sdk/python3/ydb/export.py index 43d326e1eb..30898cbb42 100644 --- a/ydb/public/sdk/python3/ydb/export.py +++ b/ydb/public/sdk/python3/ydb/export.py @@ -3,8 +3,16 @@ import enum from . import _apis from . import settings_impl as s_impl -from ydb.public.api.protos import ydb_export_pb2 -from ydb.public.api.grpc import ydb_export_v1_pb2_grpc + +# Workaround for good IDE and universal for runtime +# noinspection PyUnreachableCode +if False: + from ._grpc.v4.protos import ydb_export_pb2 + from ._grpc.v4 import ydb_export_v1_pb2_grpc +else: + from ._grpc.common.protos import ydb_export_pb2 + from ._grpc.common import ydb_export_v1_pb2_grpc + from . import operation _ExportToYt = "ExportToYt" diff --git a/ydb/public/sdk/python3/ydb/global_settings.py b/ydb/public/sdk/python3/ydb/global_settings.py new file mode 100644 index 0000000000..de8b0b1b06 --- /dev/null +++ b/ydb/public/sdk/python3/ydb/global_settings.py @@ -0,0 +1,16 @@ +from . import convert +from . import table + + +def global_allow_truncated_result(enabled: bool = True): + """ + call global_allow_truncated_result(False) for more safe execution and compatible with future changes + """ + convert._default_allow_truncated_result = enabled + + +def global_allow_split_transactions(enabled: bool): + """ + call global_allow_truncated_result(False) for more safe execution and compatible with future changes + """ + table._allow_split_transaction = enabled diff --git a/ydb/public/sdk/python3/ydb/import_client.py b/ydb/public/sdk/python3/ydb/import_client.py index 160b0a6bed..d1ccc99af6 100644 --- a/ydb/public/sdk/python3/ydb/import_client.py +++ b/ydb/public/sdk/python3/ydb/import_client.py @@ -3,8 +3,17 @@ import enum from . import _apis from . import settings_impl as s_impl -from ydb.public.api.protos import ydb_import_pb2 -from ydb.public.api.grpc import ydb_import_v1_pb2_grpc + +# Workaround for good IDE and universal for runtime +# noinspection PyUnreachableCode +if False: + from ._grpc.v4.protos import ydb_import_pb2 + from ._grpc.v4 import ydb_import_v1_pb2_grpc +else: + from ._grpc.common.protos import ydb_import_pb2 + from ._grpc.common import ydb_import_v1_pb2_grpc + + from . import operation _ImportFromS3 = "ImportFromS3" diff --git a/ydb/public/sdk/python3/ydb/issues.py b/ydb/public/sdk/python3/ydb/issues.py index 727aff1bf7..0a0d6a907e 100644 --- a/ydb/public/sdk/python3/ydb/issues.py +++ b/ydb/public/sdk/python3/ydb/issues.py @@ -52,6 +52,10 @@ class Error(Exception): self.message = message +class TruncatedResponseError(Error): + status = None + + class ConnectionError(Error): status = None diff --git a/ydb/public/sdk/python3/ydb/scheme.py b/ydb/public/sdk/python3/ydb/scheme.py index a78357e030..88eca78c77 100644 --- a/ydb/public/sdk/python3/ydb/scheme.py +++ b/ydb/public/sdk/python3/ydb/scheme.py @@ -12,6 +12,7 @@ class SchemeEntryType(enum.IntEnum): Enumerates all available entry types. """ + TYPE_UNSPECIFIED = 0 DIRECTORY = 1 TABLE = 2 PERS_QUEUE_GROUP = 3 @@ -24,6 +25,10 @@ class SchemeEntryType(enum.IntEnum): REPLICATION = 16 TOPIC = 17 + @classmethod + def _missing_(cls, value): + return cls.TYPE_UNSPECIFIED + @staticmethod def is_table(entry): """ @@ -299,7 +304,7 @@ def _wrap_scheme_entry(entry_pb, scheme_entry_cls=None, *args, **kwargs): return scheme_entry_cls( entry_pb.name, entry_pb.owner, - getattr(SchemeEntryType, _apis.ydb_scheme.Entry.Type.Name(entry_pb.type)), + SchemeEntryType(entry_pb.type), _wrap_permissions(entry_pb.effective_permissions), _wrap_permissions(entry_pb.permissions), entry_pb.size_bytes, @@ -318,7 +323,7 @@ def _wrap_list_directory_response(rpc_state, response): message = _apis.ydb_scheme.ListDirectoryResult() response.operation.result.Unpack(message) children = [] - supported_items = set([i.value for i in SchemeEntryType]) + supported_items = set(i.value for i in SchemeEntryType) for children_item in message.children: if children_item.type not in supported_items: continue @@ -328,12 +333,10 @@ def _wrap_list_directory_response(rpc_state, response): return Directory( message.self.name, message.self.owner, - getattr(SchemeEntryType, _apis.ydb_scheme.Entry.Type.Name(message.self.type)), + SchemeEntryType(message.self.type), _wrap_permissions(message.self.effective_permissions), _wrap_permissions(message.self.permissions), - tuple( - children, - ), + tuple(children), ) diff --git a/ydb/public/sdk/python3/ydb/scheme_test.py b/ydb/public/sdk/python3/ydb/scheme_test.py new file mode 100644 index 0000000000..7909488292 --- /dev/null +++ b/ydb/public/sdk/python3/ydb/scheme_test.py @@ -0,0 +1,30 @@ +from ydb.scheme import ( + SchemeEntryType, + _wrap_scheme_entry, + _wrap_list_directory_response, +) +from ydb._apis import ydb_scheme + + +def test_wrap_scheme_entry(): + assert ( + _wrap_scheme_entry(ydb_scheme.Entry(type=1)).type is SchemeEntryType.DIRECTORY + ) + assert _wrap_scheme_entry(ydb_scheme.Entry(type=17)).type is SchemeEntryType.TOPIC + + assert ( + _wrap_scheme_entry(ydb_scheme.Entry()).type is SchemeEntryType.TYPE_UNSPECIFIED + ) + assert ( + _wrap_scheme_entry(ydb_scheme.Entry(type=10)).type + is SchemeEntryType.TYPE_UNSPECIFIED + ) + assert ( + _wrap_scheme_entry(ydb_scheme.Entry(type=1001)).type + is SchemeEntryType.TYPE_UNSPECIFIED + ) + + +def test_wrap_list_directory_response(): + d = _wrap_list_directory_response(None, ydb_scheme.ListDirectoryResponse()) + assert d.type is SchemeEntryType.TYPE_UNSPECIFIED diff --git a/ydb/public/sdk/python3/ydb/scripting.py b/ydb/public/sdk/python3/ydb/scripting.py index 587630f324..9fed037aec 100644 --- a/ydb/public/sdk/python3/ydb/scripting.py +++ b/ydb/public/sdk/python3/ydb/scripting.py @@ -1,5 +1,13 @@ -from ydb.public.api.protos import ydb_scripting_pb2 -from ydb.public.api.grpc import ydb_scripting_v1_pb2_grpc +# Workaround for good IDE and universal for runtime +# noinspection PyUnreachableCode +if False: + from ._grpc.v4.protos import ydb_scripting_pb2 + from ._grpc.v4 import ydb_scripting_v1_pb2_grpc +else: + from ._grpc.common.protos import ydb_scripting_pb2 + from ._grpc.common import ydb_scripting_v1_pb2_grpc + + from . import issues, convert, settings diff --git a/ydb/public/sdk/python3/ydb/table.py b/ydb/public/sdk/python3/ydb/table.py index 2241de3590..660959bfc6 100644 --- a/ydb/public/sdk/python3/ydb/table.py +++ b/ydb/public/sdk/python3/ydb/table.py @@ -21,12 +21,15 @@ from . import ( _tx_ctx_impl, tracing, ) +from ._errors import check_retriable_error try: from . import interceptor except ImportError: interceptor = None +_allow_split_transaction = True + logger = logging.getLogger(__name__) ################################################################## @@ -780,6 +783,22 @@ class AbstractTransactionModeBuilder(object): pass +class SnapshotReadOnly(AbstractTransactionModeBuilder): + __slots__ = ("_pb", "_name") + + def __init__(self): + self._pb = _apis.ydb_table.SnapshotModeSettings() + self._name = "snapshot_read_only" + + @property + def settings(self): + return self._pb + + @property + def name(self): + return self._name + + class SerializableReadWrite(AbstractTransactionModeBuilder): __slots__ = ("_pb", "_name") @@ -900,12 +919,28 @@ class YdbRetryOperationSleepOpt(object): def __init__(self, timeout): self.timeout = timeout + def __eq__(self, other): + return type(self) == type(other) and self.timeout == other.timeout + + def __repr__(self): + return "YdbRetryOperationSleepOpt(%s)" % self.timeout + class YdbRetryOperationFinalResult(object): def __init__(self, result): self.result = result self.exc = None + def __eq__(self, other): + return ( + type(self) == type(other) + and self.result == other.result + and self.exc == other.exc + ) + + def __repr__(self): + return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc) + def set_exception(self, exc): self.exc = exc @@ -922,56 +957,28 @@ def retry_operation_impl(callee, retry_settings=None, *args, **kwargs): if result.exc is not None: raise result.exc - except ( - issues.Aborted, - issues.BadSession, - issues.NotFound, - issues.InternalError, - ) as e: - status = e - retry_settings.on_ydb_error_callback(e) - - if isinstance(e, issues.NotFound) and not retry_settings.retry_not_found: - raise e - - if ( - isinstance(e, issues.InternalError) - and not retry_settings.retry_internal_error - ): - raise e - - except ( - issues.Overloaded, - issues.SessionPoolEmpty, - issues.ConnectionError, - ) as e: - status = e - retry_settings.on_ydb_error_callback(e) - yield YdbRetryOperationSleepOpt( - retry_settings.slow_backoff.calc_timeout(attempt) - ) - - except issues.Unavailable as e: + except issues.Error as e: status = e retry_settings.on_ydb_error_callback(e) - yield YdbRetryOperationSleepOpt( - retry_settings.fast_backoff.calc_timeout(attempt) - ) - except issues.Undetermined as e: - status = e - retry_settings.on_ydb_error_callback(e) - if not retry_settings.idempotent: - # operation is not idempotent, so we cannot retry. + retriable_info = check_retriable_error(e, retry_settings, attempt) + if not retriable_info.is_retriable: raise - yield YdbRetryOperationSleepOpt( - retry_settings.fast_backoff.calc_timeout(attempt) - ) + skip_yield_error_types = [ + issues.Aborted, + issues.BadSession, + issues.NotFound, + issues.InternalError, + ] - except issues.Error as e: - retry_settings.on_ydb_error_callback(e) - raise + yield_sleep = True + for t in skip_yield_error_types: + if isinstance(e, t): + yield_sleep = False + + if yield_sleep: + yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) except Exception as e: # you should provide your own handler you want @@ -999,6 +1006,7 @@ class TableClientSettings(object): self._native_json_in_result_sets = False self._native_interval_in_result_sets = False self._native_timestamp_in_result_sets = False + self._allow_truncated_result = convert._default_allow_truncated_result def with_native_timestamp_in_result_sets(self, enabled): # type:(bool) -> ydb.TableClientSettings @@ -1035,6 +1043,11 @@ class TableClientSettings(object): self._make_result_sets_lazy = enabled return self + def with_allow_truncated_result(self, enabled): + # type:(bool) -> ydb.TableClientSettings + self._allow_truncated_result = enabled + return self + class ScanQueryResult(object): def __init__(self, result, table_client_settings): @@ -1171,7 +1184,9 @@ class ISession: pass @abstractmethod - def transaction(self, tx_mode=None): + def transaction( + self, tx_mode=None, allow_split_transactions=_allow_split_transaction + ): pass @abstractmethod @@ -1676,8 +1691,16 @@ class BaseSession(ISession): self._state.endpoint, ) - def transaction(self, tx_mode=None): - return TxContext(self._driver, self._state, self, tx_mode) + def transaction( + self, tx_mode=None, allow_split_transactions=_allow_split_transaction + ): + return TxContext( + self._driver, + self._state, + self, + tx_mode, + allow_split_transactions=allow_split_transactions, + ) def has_prepared(self, query): return query in self._state @@ -2189,9 +2212,27 @@ class ITxContext: class BaseTxContext(ITxContext): - __slots__ = ("_tx_state", "_session_state", "_driver", "session") + __slots__ = ( + "_tx_state", + "_session_state", + "_driver", + "session", + "_finished", + "_allow_split_transactions", + ) - def __init__(self, driver, session_state, session, tx_mode=None): + _COMMIT = "commit" + _ROLLBACK = "rollback" + + def __init__( + self, + driver, + session_state, + session, + tx_mode=None, + *, + allow_split_transactions=_allow_split_transaction + ): """ An object that provides a simple transaction context manager that allows statements execution in a transaction. You don't have to open transaction explicitly, because context manager encapsulates @@ -2214,6 +2255,8 @@ class BaseTxContext(ITxContext): self._tx_state = _tx_ctx_impl.TxState(tx_mode) self._session_state = session_state self.session = session + self._finished = "" + self._allow_split_transactions = allow_split_transactions def __enter__(self): """ @@ -2271,6 +2314,9 @@ class BaseTxContext(ITxContext): :return: A result sets or exception in case of execution errors """ + + self._check_split() + return self._driver( _tx_ctx_impl.execute_request_factory( self._session_state, @@ -2297,8 +2343,12 @@ class BaseTxContext(ITxContext): :return: A committed transaction or exception if commit is failed """ + + self._set_finish(self._COMMIT) + if self._tx_state.tx_id is None and not self._tx_state.dead: return self + return self._driver( _tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, @@ -2318,8 +2368,12 @@ class BaseTxContext(ITxContext): :return: A rolled back transaction or exception if rollback is failed """ + + self._set_finish(self._ROLLBACK) + if self._tx_state.tx_id is None and not self._tx_state.dead: return self + return self._driver( _tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, @@ -2340,6 +2394,9 @@ class BaseTxContext(ITxContext): """ if self._tx_state.tx_id is not None: return self + + self._check_split() + return self._driver( _tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, @@ -2350,6 +2407,21 @@ class BaseTxContext(ITxContext): self._session_state.endpoint, ) + def _set_finish(self, val): + self._check_split(val) + self._finished = val + + def _check_split(self, allow=""): + """ + Deny all operaions with transaction after commit/rollback. + Exception: double commit and double rollbacks, because it is safe + """ + if self._allow_split_transactions: + return + + if self._finished != "" and self._finished != allow: + raise RuntimeError("Any operation with finished transaction is denied") + class TxContext(BaseTxContext): @_utilities.wrap_async_call_exceptions @@ -2365,6 +2437,9 @@ class TxContext(BaseTxContext): :return: A future of query execution """ + + self._check_split() + return self._driver.future( _tx_ctx_impl.execute_request_factory( self._session_state, @@ -2396,8 +2471,11 @@ class TxContext(BaseTxContext): :return: A future of commit call """ + self._set_finish(self._COMMIT) + if self._tx_state.tx_id is None and not self._tx_state.dead: return _utilities.wrap_result_in_future(self) + return self._driver.future( _tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, @@ -2418,8 +2496,11 @@ class TxContext(BaseTxContext): :return: A future of rollback call """ + self._set_finish(self._ROLLBACK) + if self._tx_state.tx_id is None and not self._tx_state.dead: return _utilities.wrap_result_in_future(self) + return self._driver.future( _tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, @@ -2441,6 +2522,9 @@ class TxContext(BaseTxContext): """ if self._tx_state.tx_id is not None: return _utilities.wrap_result_in_future(self) + + self._check_split() + return self._driver.future( _tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state), _apis.TableService.Stub, diff --git a/ydb/public/sdk/python3/ydb/table_test.py b/ydb/public/sdk/python3/ydb/table_test.py new file mode 100644 index 0000000000..2cb2a6a0d0 --- /dev/null +++ b/ydb/public/sdk/python3/ydb/table_test.py @@ -0,0 +1,140 @@ +from unittest import mock +from ydb import ( + retry_operation_impl, + YdbRetryOperationFinalResult, + issues, + YdbRetryOperationSleepOpt, + RetrySettings, +) + + +def test_retry_operation_impl(monkeypatch): + monkeypatch.setattr("random.random", lambda: 0.5) + monkeypatch.setattr( + issues.Error, + "__eq__", + lambda self, other: type(self) == type(other) and self.message == other.message, + ) + + retry_once_settings = RetrySettings( + max_retries=1, + on_ydb_error_callback=mock.Mock(), + ) + retry_once_settings.unknown_error_handler = mock.Mock() + + def get_results(callee): + res_generator = retry_operation_impl(callee, retry_settings=retry_once_settings) + results = [] + exc = None + try: + for res in res_generator: + results.append(res) + if isinstance(res, YdbRetryOperationFinalResult): + break + except Exception as e: + exc = e + + return results, exc + + class TestException(Exception): + def __init__(self, message): + super(TestException, self).__init__(message) + self.message = message + + def __eq__(self, other): + return type(self) == type(other) and self.message == other.message + + def check_unretriable_error(err_type, call_ydb_handler): + retry_once_settings.on_ydb_error_callback.reset_mock() + retry_once_settings.unknown_error_handler.reset_mock() + + results = get_results( + mock.Mock(side_effect=[err_type("test1"), err_type("test2")]) + ) + yields = results[0] + exc = results[1] + + assert yields == [] + assert exc == err_type("test1") + + if call_ydb_handler: + assert retry_once_settings.on_ydb_error_callback.call_count == 1 + retry_once_settings.on_ydb_error_callback.assert_called_with( + err_type("test1") + ) + + assert retry_once_settings.unknown_error_handler.call_count == 0 + else: + assert retry_once_settings.on_ydb_error_callback.call_count == 0 + + assert retry_once_settings.unknown_error_handler.call_count == 1 + retry_once_settings.unknown_error_handler.assert_called_with( + err_type("test1") + ) + + def check_retriable_error(err_type, backoff): + retry_once_settings.on_ydb_error_callback.reset_mock() + + results = get_results( + mock.Mock(side_effect=[err_type("test1"), err_type("test2")]) + ) + yields = results[0] + exc = results[1] + + if backoff: + assert [ + YdbRetryOperationSleepOpt(backoff.calc_timeout(0)), + YdbRetryOperationSleepOpt(backoff.calc_timeout(1)), + ] == yields + else: + assert [] == yields + + assert exc == err_type("test2") + + assert retry_once_settings.on_ydb_error_callback.call_count == 2 + retry_once_settings.on_ydb_error_callback.assert_any_call(err_type("test1")) + retry_once_settings.on_ydb_error_callback.assert_called_with(err_type("test2")) + + assert retry_once_settings.unknown_error_handler.call_count == 0 + + # check ok + assert get_results(lambda: True) == ([YdbRetryOperationFinalResult(True)], None) + + # check retry error and return result + assert get_results(mock.Mock(side_effect=[issues.Overloaded("test"), True])) == ( + [ + YdbRetryOperationSleepOpt(retry_once_settings.slow_backoff.calc_timeout(0)), + YdbRetryOperationFinalResult(True), + ], + None, + ) + + # check errors + check_retriable_error(issues.Aborted, None) + check_retriable_error(issues.BadSession, None) + + check_retriable_error(issues.NotFound, None) + with mock.patch.object(retry_once_settings, "retry_not_found", False): + check_unretriable_error(issues.NotFound, True) + + check_retriable_error(issues.InternalError, None) + with mock.patch.object(retry_once_settings, "retry_internal_error", False): + check_unretriable_error(issues.InternalError, True) + + check_retriable_error(issues.Overloaded, retry_once_settings.slow_backoff) + check_retriable_error(issues.SessionPoolEmpty, retry_once_settings.slow_backoff) + check_retriable_error(issues.ConnectionError, retry_once_settings.slow_backoff) + + check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff) + + check_unretriable_error(issues.Undetermined, True) + with mock.patch.object(retry_once_settings, "idempotent", True): + check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff) + + check_unretriable_error(issues.Error, True) + with mock.patch.object(retry_once_settings, "idempotent", True): + check_unretriable_error(issues.Error, True) + + check_unretriable_error(TestException, False) + with mock.patch.object(retry_once_settings, "idempotent", True): + check_unretriable_error(TestException, False) diff --git a/ydb/public/sdk/python3/ydb/types.py b/ydb/public/sdk/python3/ydb/types.py index 598a9013aa..a62c8a74a0 100644 --- a/ydb/public/sdk/python3/ydb/types.py +++ b/ydb/public/sdk/python3/ydb/types.py @@ -12,6 +12,7 @@ from google.protobuf import struct_pb2 _SECONDS_IN_DAY = 60 * 60 * 24 _EPOCH = datetime(1970, 1, 1) + if six.PY3: _from_bytes = None else: @@ -20,13 +21,20 @@ else: return _utilities.from_bytes(x) -def _from_date_number(x, table_client_settings): +def _from_date(x, table_client_settings): if ( table_client_settings is not None and table_client_settings._native_date_in_result_sets ): - return date.fromordinal(x + date(1970, 1, 1).toordinal()) - return x + return _EPOCH.date() + timedelta(days=x.uint32_value) + return x.uint32_value + + +def _to_date(pb, value): + if isinstance(value, date): + pb.uint32_value = (value - _EPOCH.date()).days + else: + pb.uint32_value = value def _from_datetime_number(x, table_client_settings): @@ -122,8 +130,9 @@ class PrimitiveType(enum.Enum): UUID = (_apis.primitive_types.UUID, None, _to_uuid, _from_uuid) Date = ( _apis.primitive_types.DATE, - "uint32_value", - _from_date_number, + None, + _from_date, + _to_date, ) Datetime = ( _apis.primitive_types.DATETIME, diff --git a/ydb/public/sdk/python3/ydb/ydb_version.py b/ydb/public/sdk/python3/ydb/ydb_version.py index 24acd1ff32..dde8252c21 100644 --- a/ydb/public/sdk/python3/ydb/ydb_version.py +++ b/ydb/public/sdk/python3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "2.10.0" +VERSION = "2.13.2" diff --git a/ydb/public/sdk/ya.make b/ydb/public/sdk/ya.make index 2507276520..0cb53806e9 100644 --- a/ydb/public/sdk/ya.make +++ b/ydb/public/sdk/ya.make @@ -1,4 +1,6 @@ RECURSE( cpp python + python2 + python3 ) |