diff options
| author | robot-piglet <[email protected]> | 2024-12-04 14:13:07 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2024-12-04 14:25:45 +0300 |
| commit | dd4950b261106312653a90df9501dc021e2ba6f9 (patch) | |
| tree | 19fe318d45c9ff528b0d34a43ab0c7b80e56f264 /contrib/python | |
| parent | 2306c20638ac023c11d49898ef5c723df3290a47 (diff) | |
Intermediate changes
commit_hash:06b411ff8b073c2988bf379ede4740bce56ac3e7
Diffstat (limited to 'contrib/python')
21 files changed, 610 insertions, 80 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index d78acdc0709..756e15d29af 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.18.1 +Version: 3.18.10 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch b/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch index fa8a442a9e5..19914950e6b 100644 --- a/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch +++ b/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch @@ -1,23 +1,27 @@ --- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index) +++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree) -@@ -1,38 +1,8 @@ - import sys - +@@ -1,46 +0,0 @@ +-import sys +- -import google.protobuf -from packaging.version import Version -from ... import _utilities -+from contrib.ydb.public.api.grpc import * # noqa -+sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"] - +- -# generated files are incompatible between 3 and 4 protobuf versions -# import right generated version for current protobuf lib -# sdk code must always import from ydb._grpc.common -protobuf_version = Version(google.protobuf.__version__) -+from contrib.ydb.public.api import protos -+sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"] - +- -# for compatible with arcadia --if _utilities.check_module_exists("ydb.public.api"): +-if _utilities.check_module_exists("contrib.ydb.public.api"): +- from contrib.ydb.public.api.grpc import * # noqa +- +- sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"] +- +- from contrib.ydb.public.api import protos +- +- sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"] +-elif _utilities.check_module_exists("ydb.public.api"): - from ydb.public.api.grpc import * # noqa - - sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"] diff --git a/contrib/python/ydb/py3/patches/03-old-behavior-compatible.patch b/contrib/python/ydb/py3/patches/02-old-behavior-compatible.patch index ef00525b167..ef00525b167 100644 --- a/contrib/python/ydb/py3/patches/03-old-behavior-compatible.patch +++ b/contrib/python/ydb/py3/patches/02-old-behavior-compatible.patch diff --git a/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch b/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch deleted file mode 100644 index 5303e5377db..00000000000 --- a/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch +++ /dev/null @@ -1,21 +0,0 @@ ---- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index) -+++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree) -@@ -1,8 +1,13 @@ - import sys -+try: -+ from ydb.public.api.grpc import * # noqa -+ sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"] - --from contrib.ydb.public.api.grpc import * # noqa --sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"] -- --from contrib.ydb.public.api import protos --sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"] -+ from ydb.public.api import protos -+ sys.modules["ydb._grpc.common.protos"] = sys.modules["ydb.public.api.protos"] -+except ImportError: -+ from contrib.ydb.public.api.grpc import * # noqa -+ sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"] - -+ from contrib.ydb.public.api import protos -+ sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"] diff --git a/contrib/python/ydb/py3/patches/04-change-inport-protobufs.patch b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch index 1874c32aaf5..1874c32aaf5 100644 --- a/contrib/python/ydb/py3/patches/04-change-inport-protobufs.patch +++ b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch diff --git a/contrib/python/ydb/py3/patches/05-change-import-protobufs-fixes.patch b/contrib/python/ydb/py3/patches/04-change-import-protobufs-fixes.patch index 20e862d3f62..a4d12275f0a 100644 --- a/contrib/python/ydb/py3/patches/05-change-import-protobufs-fixes.patch +++ b/contrib/python/ydb/py3/patches/04-change-import-protobufs-fixes.patch @@ -1,24 +1,8 @@ ---- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index) -+++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree) -@@ -1,13 +0,0 @@ --import sys --try: -- from ydb.public.api.grpc import * # noqa -- sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"] -- -- from ydb.public.api import protos -- sys.modules["ydb._grpc.common.protos"] = sys.modules["ydb.public.api.protos"] --except ImportError: -- from contrib.ydb.public.api.grpc import * # noqa -- sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"] -- -- from contrib.ydb.public.api import protos -- sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"] --- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py (index) +++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py (working tree) @@ -27,9 +27,9 @@ from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp import ydb.aio - + try: - from ..v4.protos import ydb_topic_pb2, ydb_issue_message_pb2 -else: @@ -26,14 +10,14 @@ + from ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2 +except ImportError: + from contrib.ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2 - + from ... import issues, connection - + --- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py (index) +++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py (working tree) @@ -13,9 +13,9 @@ from ... import scheme from ... import issues - + try: - from ..v4.protos import ydb_scheme_pb2, ydb_topic_pb2 -else: @@ -41,14 +25,14 @@ + from ydb.public.api.protos import ydb_scheme_pb2, ydb_topic_pb2 +except ImportError: + from contrib.ydb.public.api.protos import ydb_scheme_pb2, ydb_topic_pb2 - + from .common_utils import ( IFromProto, --- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py (index) +++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py (working tree) @@ -5,9 +5,9 @@ from enum import IntEnum from typing import Optional, List, Union, Dict - + try: - from ..v4.protos import ydb_topic_pb2 -else: @@ -56,6 +40,6 @@ + from ydb.public.api.protos import ydb_topic_pb2 +except ImportError: + from contrib.ydb.public.api.protos import ydb_topic_pb2 - + from .common_utils import IToProto from ...scheme import SchemeEntry diff --git a/contrib/python/ydb/py3/patches/06-change-import-protobufs-query.patch b/contrib/python/ydb/py3/patches/05-change-import-protobufs-query.patch index 3c43ec0947d..1db5f29ac66 100644 --- a/contrib/python/ydb/py3/patches/06-change-import-protobufs-query.patch +++ b/contrib/python/ydb/py3/patches/05-change-import-protobufs-query.patch @@ -36,7 +36,7 @@ class BaseQueryTxMode(IToProto): --- contrib/python/ydb/py3/ydb/draft/_apis.py (index) +++ contrib/python/ydb/py3/ydb/draft/_apis.py (working tree) -@@ -1,21 +1,20 @@ +@@ -1,28 +1,22 @@ # -*- coding: utf-8 -*- import typing @@ -59,8 +59,17 @@ ydb_dynamic_config_v1_pb2_grpc, ) -- from .._grpc.common.draft.protos import ( +- try: +- from .._grpc.common.draft.protos import ( +- ydb_dynamic_config_pb2, +- ) +- except ImportError: +- from .._grpc.common.protos.draft import ( +- ydb_dynamic_config_pb2, +- ) + from contrib.ydb.public.api.protos.draft import ( - ydb_dynamic_config_pb2, - ) ++ ydb_dynamic_config_pb2, ++ ) + + ydb_dynamic_config = ydb_dynamic_config_pb2 diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 5d5e6636e39..a643d0b0b40 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.18.1) +VERSION(3.18.10) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py index 0b5ec41df7e..24f3db30bc4 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py @@ -69,6 +69,10 @@ class QueryOnlineReadOnly(BaseQueryTxMode): def name(self): return self._name + def with_allow_inconsistent_reads(self) -> "QueryOnlineReadOnly": + self.allow_inconsistent_reads = True + return self + def to_proto(self) -> ydb_query_pb2.OnlineModeSettings: return ydb_query_pb2.OnlineModeSettings(allow_inconsistent_reads=self.allow_inconsistent_reads) diff --git a/contrib/python/ydb/py3/ydb/_utilities.py b/contrib/python/ydb/py3/ydb/_utilities.py index e89b0af315d..117c7407f24 100644 --- a/contrib/python/ydb/py3/ydb/_utilities.py +++ b/contrib/python/ydb/py3/ydb/_utilities.py @@ -182,3 +182,21 @@ class AtomicCounter: with self._lock: self._value += 1 return self._value + + +def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout: int): + waiter = future() + + def get_first_response(waiter): + first_response = next(status_stream) + waiter.set_result(first_response) + + thread = threading.Thread( + target=get_first_response, + args=(waiter,), + name="first response attach stream thread", + daemon=True, + ) + thread.start() + + return waiter.result(timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/aio/_utilities.py b/contrib/python/ydb/py3/ydb/aio/_utilities.py index 454378b0d40..5bd0f1a0471 100644 --- a/contrib/python/ydb/py3/ydb/aio/_utilities.py +++ b/contrib/python/ydb/py3/ydb/aio/_utilities.py @@ -1,3 +1,6 @@ +import asyncio + + class AsyncResponseIterator(object): def __init__(self, it, wrapper): self.it = it.__aiter__() @@ -21,3 +24,10 @@ class AsyncResponseIterator(object): async def __anext__(self): return await self._next() + + +async def get_first_message_with_timeout(stream: AsyncResponseIterator, timeout: int): + async def get_first_response(): + return await stream.next() + + return await asyncio.wait_for(get_first_response(), timeout) diff --git a/contrib/python/ydb/py3/ydb/aio/driver.py b/contrib/python/ydb/py3/ydb/aio/driver.py index 0f4f3630f96..9cd6fd2b74d 100644 --- a/contrib/python/ydb/py3/ydb/aio/driver.py +++ b/contrib/python/ydb/py3/ydb/aio/driver.py @@ -59,3 +59,7 @@ class Driver(pool.ConnectionPool): self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, config.table_client_settings) self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings) + + async def stop(self, timeout=10): + await self.table_client._stop_pool_if_needed(timeout=timeout) + await super().stop(timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index 6d116600c98..456896dbb50 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -13,6 +13,7 @@ from ...retries import ( RetrySettings, retry_operation_async, ) +from ...query.base import QueryClientSettings from ... import convert from ..._grpc.grpcwrapper import common_utils @@ -22,10 +23,18 @@ logger = logging.getLogger(__name__) class QuerySessionPool: """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" - def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100): + def __init__( + self, + driver: common_utils.SupportedDriverType, + size: int = 100, + *, + query_client_settings: Optional[QueryClientSettings] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): """ :param driver: A driver instance :param size: Size of session pool + :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior """ self._driver = driver @@ -34,10 +43,11 @@ class QuerySessionPool: self._queue = asyncio.Queue() self._current_size = 0 self._waiters = 0 - self._loop = asyncio.get_running_loop() + self._loop = asyncio.get_running_loop() if loop is None else loop + self._query_client_settings = query_client_settings async def _create_new_session(self): - session = QuerySession(self._driver) + session = QuerySession(self._driver, settings=self._query_client_settings) await session.create() logger.debug(f"New session was created for pool. Session id: {session._state.session_id}") return session diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py index 779eb3f0235..0561de8c391 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/session.py +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -15,6 +15,7 @@ from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public from ...query import base from ...query.session import ( BaseQuerySession, + DEFAULT_ATTACH_FIRST_RESP_TIMEOUT, QuerySessionStateEnum, ) @@ -43,9 +44,17 @@ class QuerySession(BaseQuerySession): lambda response: common_utils.ServerStatus.from_proto(response), ) - first_response = await self._status_stream.next() - if first_response.status != issues.StatusCode.SUCCESS: - pass + try: + first_response = await _utilities.get_first_message_with_timeout( + self._status_stream, + DEFAULT_ATTACH_FIRST_RESP_TIMEOUT, + ) + if first_response.status != issues.StatusCode.SUCCESS: + raise RuntimeError("Failed to attach session") + except Exception as e: + self._state.reset() + self._status_stream.cancel() + raise e self._state.set_attached(True) self._state._change_state(QuerySessionStateEnum.CREATED) diff --git a/contrib/python/ydb/py3/ydb/aio/table.py b/contrib/python/ydb/py3/ydb/aio/table.py index 3c25f7d20ac..aec32e1a26e 100644 --- a/contrib/python/ydb/py3/ydb/aio/table.py +++ b/contrib/python/ydb/py3/ydb/aio/table.py @@ -3,6 +3,14 @@ import logging import time import typing +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) + import ydb from ydb import issues, settings as settings_impl, table @@ -13,6 +21,7 @@ from ydb.table import ( _scan_query_request_factory, _wrap_scan_query_response, BaseTxContext, + TableDescription, ) from . import _utilities from ydb import _apis, _session_impl @@ -139,6 +148,18 @@ class Session(BaseSession): class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: Optional[SessionPool] = None + + def __del__(self): + if self._pool is not None and not self._pool._terminating: + try: + asyncio.get_running_loop().create_task(self._stop_pool_if_needed()) + except Exception: + pass + def session(self): return Session(self._driver, self._table_client_settings) @@ -158,6 +179,214 @@ class TableClient(BaseTableClient): lambda resp: _wrap_scan_query_response(resp, self._table_client_settings), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + async def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None and not self._pool._terminating: + await self._pool.stop(timeout=timeout) + self._pool = None + + async def create_table( + self, + path: str, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Create a YDB table. + + :param path: A table path + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.create_table(path=path, table_description=table_description, settings=settings) + + return await self._pool.retry_operation(callee) + + async def drop_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.drop_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def alter_table( + self, + path: str, + add_columns: Optional[List["ydb.Column"]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, + ) -> "ydb.Operation": + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return await self._pool.retry_operation(callee) + + async def describe_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.describe_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return await self._pool.retry_operation(callee) + + async def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return await self._pool.retry_operation(callee) + + async def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.rename_tables(rename_items=rename_items, settings=settings) + + return await self._pool.retry_operation(callee) + class TxContext(BaseTxContext): async def __aenter__(self): diff --git a/contrib/python/ydb/py3/ydb/auth_helpers.py b/contrib/python/ydb/py3/ydb/auth_helpers.py index 6399c3cfdf0..abf7331af6e 100644 --- a/contrib/python/ydb/py3/ydb/auth_helpers.py +++ b/contrib/python/ydb/py3/ydb/auth_helpers.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import os +from typing import Optional def read_bytes(f): @@ -7,8 +8,8 @@ def read_bytes(f): return fr.read() -def load_ydb_root_certificate(): - path = os.getenv("YDB_SSL_ROOT_CERTIFICATES_FILE", None) +def load_ydb_root_certificate(path: Optional[str] = None): + path = path if path is not None else os.getenv("YDB_SSL_ROOT_CERTIFICATES_FILE", None) if path is not None and os.path.exists(path): return read_bytes(path) return None diff --git a/contrib/python/ydb/py3/ydb/driver.py b/contrib/python/ydb/py3/ydb/driver.py index 1559b0d00bc..49bd223c901 100644 --- a/contrib/python/ydb/py3/ydb/driver.py +++ b/contrib/python/ydb/py3/ydb/driver.py @@ -89,6 +89,7 @@ class DriverConfig(object): "secure_channel", "table_client_settings", "topic_client_settings", + "query_client_settings", "endpoints", "primary_user_agent", "tracer", @@ -112,6 +113,7 @@ class DriverConfig(object): grpc_keep_alive_timeout=None, table_client_settings=None, topic_client_settings=None, + query_client_settings=None, endpoints=None, primary_user_agent="python-library", tracer=None, @@ -159,6 +161,7 @@ class DriverConfig(object): self.grpc_keep_alive_timeout = grpc_keep_alive_timeout self.table_client_settings = table_client_settings self.topic_client_settings = topic_client_settings + self.query_client_settings = query_client_settings self.primary_user_agent = primary_user_agent self.tracer = tracer if tracer is not None else tracing.Tracer(None) self.grpc_lb_policy_name = grpc_lb_policy_name @@ -282,3 +285,7 @@ class Driver(pool.ConnectionPool): self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, driver_config.table_client_settings) self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings) + + def stop(self, timeout=10): + self.table_client._stop_pool_if_needed(timeout=timeout) + super().stop(timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index 4c51a971ced..f1fcd17360e 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -8,6 +8,7 @@ import time import threading import queue +from .base import QueryClientSettings from .session import ( QuerySession, ) @@ -27,10 +28,17 @@ logger = logging.getLogger(__name__) class QuerySessionPool: """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" - def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100): + def __init__( + self, + driver: common_utils.SupportedDriverType, + size: int = 100, + *, + query_client_settings: Optional[QueryClientSettings] = None, + ): """ :param driver: A driver instance. :param size: Max size of Session Pool. + :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior """ self._driver = driver @@ -39,9 +47,10 @@ class QuerySessionPool: self._size = size self._should_stop = threading.Event() self._lock = threading.RLock() + self._query_client_settings = query_client_settings def _create_new_session(self, timeout: Optional[float]): - session = QuerySession(self._driver) + session = QuerySession(self._driver, settings=self._query_client_settings) session.create(settings=BaseRequestSettings().with_timeout(timeout)) logger.debug(f"New session was created for pool. Session id: {session._state.session_id}") return session diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index e13540d357b..382c922d5e6 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -22,6 +22,10 @@ from .transaction import QueryTxContext logger = logging.getLogger(__name__) +DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600 +DEFAULT_ATTACH_LONG_TIMEOUT = 31536000 # year + + class QuerySessionStateEnum(enum.Enum): NOT_INITIALIZED = "NOT_INITIALIZED" CREATED = "CREATED" @@ -134,8 +138,25 @@ class BaseQuerySession: def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None): self._driver = driver - self._settings = settings if settings is not None else base.QueryClientSettings() + self._settings = self._get_client_settings(driver, settings) self._state = QuerySessionState(settings) + self._attach_settings: BaseRequestSettings = ( + BaseRequestSettings() + .with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT) + .with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT) + .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT) + ) + + def _get_client_settings( + self, + driver: common_utils.SupportedDriverType, + settings: Optional[base.QueryClientSettings] = None, + ) -> base.QueryClientSettings: + if settings is not None: + return settings + if driver._driver_config.query_client_settings is not None: + return driver._driver_config.query_client_settings + return base.QueryClientSettings() def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession": return self._driver( @@ -157,12 +178,12 @@ class BaseQuerySession: settings=settings, ) - def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]: + def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]: return self._driver( _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id), _apis.QueryService.Stub, _apis.QueryService.AttachSession, - settings=settings, + settings=self._attach_settings, ) def _execute_call( @@ -202,16 +223,24 @@ class QuerySession(BaseQuerySession): _stream = None - def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None: - self._stream = self._attach_call(settings=settings) + def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None: + self._stream = self._attach_call() status_stream = _utilities.SyncResponseIterator( self._stream, lambda response: common_utils.ServerStatus.from_proto(response), ) - first_response = next(status_stream) - if first_response.status != issues.StatusCode.SUCCESS: - pass + try: + first_response = _utilities.get_first_message_with_timeout( + status_stream, + first_resp_timeout, + ) + if first_response.status != issues.StatusCode.SUCCESS: + raise RuntimeError("Failed to attach session") + except Exception as e: + self._state.reset() + status_stream.cancel() + raise e self._state.set_attached(True) self._state._change_state(QuerySessionStateEnum.CREATED) @@ -219,7 +248,7 @@ class QuerySession(BaseQuerySession): threading.Thread( target=self._check_session_status_loop, args=(status_stream,), - name="check session status thread", + name="attach stream thread", daemon=True, ).start() diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py index 01f5e52b6ae..945e9187677 100644 --- a/contrib/python/ydb/py3/ydb/table.py +++ b/contrib/python/ydb/py3/ydb/table.py @@ -4,6 +4,15 @@ import ydb from abc import abstractmethod import logging import enum +import typing + +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) from . import ( issues, @@ -1193,6 +1202,14 @@ class BaseTableClient(ITableClient): class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: Optional[SessionPool] = None + + def __del__(self): + self._stop_pool_if_needed() + def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator request = _scan_query_request_factory(query, parameters, settings) @@ -1219,6 +1236,213 @@ class TableClient(BaseTableClient): (), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None: + self._pool.stop(timeout=timeout) + + def create_table( + self, + path: str, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Create a YDB table. + + :param path: A table path + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.create_table(path=path, table_description=table_description, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def drop_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.drop_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def alter_table( + self, + path: str, + add_columns: Optional[List["ydb.Column"]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, + ) -> "ydb.Operation": + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return self._pool.retry_operation_sync(callee) + + def describe_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.describe_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return self._pool.retry_operation_sync(callee) + + def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.rename_tables(rename_items=rename_items, settings=settings) + + return self._pool.retry_operation_sync(callee) + def _make_index_description(index): result = TableIndex(index.name).with_index_columns(*tuple(col for col in index.index_columns)) diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index f890bc7248e..6d13618e64c 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.18.1" +VERSION = "3.18.10" |
