summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2024-12-04 14:13:07 +0300
committerrobot-piglet <[email protected]>2024-12-04 14:25:45 +0300
commitdd4950b261106312653a90df9501dc021e2ba6f9 (patch)
tree19fe318d45c9ff528b0d34a43ab0c7b80e56f264 /contrib/python
parent2306c20638ac023c11d49898ef5c723df3290a47 (diff)
Intermediate changes
commit_hash:06b411ff8b073c2988bf379ede4740bce56ac3e7
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch24
-rw-r--r--contrib/python/ydb/py3/patches/02-old-behavior-compatible.patch (renamed from contrib/python/ydb/py3/patches/03-old-behavior-compatible.patch)0
-rw-r--r--contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch21
-rw-r--r--contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch (renamed from contrib/python/ydb/py3/patches/04-change-inport-protobufs.patch)0
-rw-r--r--contrib/python/ydb/py3/patches/04-change-import-protobufs-fixes.patch (renamed from contrib/python/ydb/py3/patches/05-change-import-protobufs-fixes.patch)30
-rw-r--r--contrib/python/ydb/py3/patches/05-change-import-protobufs-query.patch (renamed from contrib/python/ydb/py3/patches/06-change-import-protobufs-query.patch)17
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py4
-rw-r--r--contrib/python/ydb/py3/ydb/_utilities.py18
-rw-r--r--contrib/python/ydb/py3/ydb/aio/_utilities.py10
-rw-r--r--contrib/python/ydb/py3/ydb/aio/driver.py4
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/pool.py16
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/session.py15
-rw-r--r--contrib/python/ydb/py3/ydb/aio/table.py229
-rw-r--r--contrib/python/ydb/py3/ydb/auth_helpers.py5
-rw-r--r--contrib/python/ydb/py3/ydb/driver.py7
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py13
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py47
-rw-r--r--contrib/python/ydb/py3/ydb/table.py224
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
21 files changed, 610 insertions, 80 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index d78acdc0709..756e15d29af 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.18.1
+Version: 3.18.10
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch b/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch
index fa8a442a9e5..19914950e6b 100644
--- a/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch
+++ b/contrib/python/ydb/py3/patches/01-arcadia-protobufs.patch
@@ -1,23 +1,27 @@
--- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index)
+++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree)
-@@ -1,38 +1,8 @@
- import sys
-
+@@ -1,46 +0,0 @@
+-import sys
+-
-import google.protobuf
-from packaging.version import Version
-from ... import _utilities
-+from contrib.ydb.public.api.grpc import * # noqa
-+sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"]
-
+-
-# generated files are incompatible between 3 and 4 protobuf versions
-# import right generated version for current protobuf lib
-# sdk code must always import from ydb._grpc.common
-protobuf_version = Version(google.protobuf.__version__)
-+from contrib.ydb.public.api import protos
-+sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"]
-
+-
-# for compatible with arcadia
--if _utilities.check_module_exists("ydb.public.api"):
+-if _utilities.check_module_exists("contrib.ydb.public.api"):
+- from contrib.ydb.public.api.grpc import * # noqa
+-
+- sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"]
+-
+- from contrib.ydb.public.api import protos
+-
+- sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"]
+-elif _utilities.check_module_exists("ydb.public.api"):
- from ydb.public.api.grpc import * # noqa
-
- sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"]
diff --git a/contrib/python/ydb/py3/patches/03-old-behavior-compatible.patch b/contrib/python/ydb/py3/patches/02-old-behavior-compatible.patch
index ef00525b167..ef00525b167 100644
--- a/contrib/python/ydb/py3/patches/03-old-behavior-compatible.patch
+++ b/contrib/python/ydb/py3/patches/02-old-behavior-compatible.patch
diff --git a/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch b/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch
deleted file mode 100644
index 5303e5377db..00000000000
--- a/contrib/python/ydb/py3/patches/02-original-ydb-protobuf-compatible.patch
+++ /dev/null
@@ -1,21 +0,0 @@
---- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index)
-+++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree)
-@@ -1,8 +1,13 @@
- import sys
-+try:
-+ from ydb.public.api.grpc import * # noqa
-+ sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"]
-
--from contrib.ydb.public.api.grpc import * # noqa
--sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"]
--
--from contrib.ydb.public.api import protos
--sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"]
-+ from ydb.public.api import protos
-+ sys.modules["ydb._grpc.common.protos"] = sys.modules["ydb.public.api.protos"]
-+except ImportError:
-+ from contrib.ydb.public.api.grpc import * # noqa
-+ sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"]
-
-+ from contrib.ydb.public.api import protos
-+ sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"]
diff --git a/contrib/python/ydb/py3/patches/04-change-inport-protobufs.patch b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch
index 1874c32aaf5..1874c32aaf5 100644
--- a/contrib/python/ydb/py3/patches/04-change-inport-protobufs.patch
+++ b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch
diff --git a/contrib/python/ydb/py3/patches/05-change-import-protobufs-fixes.patch b/contrib/python/ydb/py3/patches/04-change-import-protobufs-fixes.patch
index 20e862d3f62..a4d12275f0a 100644
--- a/contrib/python/ydb/py3/patches/05-change-import-protobufs-fixes.patch
+++ b/contrib/python/ydb/py3/patches/04-change-import-protobufs-fixes.patch
@@ -1,24 +1,8 @@
---- contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (index)
-+++ contrib/python/ydb/py3/ydb/_grpc/common/__init__.py (working tree)
-@@ -1,13 +0,0 @@
--import sys
--try:
-- from ydb.public.api.grpc import * # noqa
-- sys.modules["ydb._grpc.common"] = sys.modules["ydb.public.api.grpc"]
--
-- from ydb.public.api import protos
-- sys.modules["ydb._grpc.common.protos"] = sys.modules["ydb.public.api.protos"]
--except ImportError:
-- from contrib.ydb.public.api.grpc import * # noqa
-- sys.modules["ydb._grpc.common"] = sys.modules["contrib.ydb.public.api.grpc"]
--
-- from contrib.ydb.public.api import protos
-- sys.modules["ydb._grpc.common.protos"] = sys.modules["contrib.ydb.public.api.protos"]
--- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py (index)
+++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py (working tree)
@@ -27,9 +27,9 @@ from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp
import ydb.aio
-
+
try:
- from ..v4.protos import ydb_topic_pb2, ydb_issue_message_pb2
-else:
@@ -26,14 +10,14 @@
+ from ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2
+except ImportError:
+ from contrib.ydb.public.api.protos import ydb_topic_pb2, ydb_issue_message_pb2
-
+
from ... import issues, connection
-
+
--- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py (index)
+++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py (working tree)
@@ -13,9 +13,9 @@ from ... import scheme
from ... import issues
-
+
try:
- from ..v4.protos import ydb_scheme_pb2, ydb_topic_pb2
-else:
@@ -41,14 +25,14 @@
+ from ydb.public.api.protos import ydb_scheme_pb2, ydb_topic_pb2
+except ImportError:
+ from contrib.ydb.public.api.protos import ydb_scheme_pb2, ydb_topic_pb2
-
+
from .common_utils import (
IFromProto,
--- contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py (index)
+++ contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py (working tree)
@@ -5,9 +5,9 @@ from enum import IntEnum
from typing import Optional, List, Union, Dict
-
+
try:
- from ..v4.protos import ydb_topic_pb2
-else:
@@ -56,6 +40,6 @@
+ from ydb.public.api.protos import ydb_topic_pb2
+except ImportError:
+ from contrib.ydb.public.api.protos import ydb_topic_pb2
-
+
from .common_utils import IToProto
from ...scheme import SchemeEntry
diff --git a/contrib/python/ydb/py3/patches/06-change-import-protobufs-query.patch b/contrib/python/ydb/py3/patches/05-change-import-protobufs-query.patch
index 3c43ec0947d..1db5f29ac66 100644
--- a/contrib/python/ydb/py3/patches/06-change-import-protobufs-query.patch
+++ b/contrib/python/ydb/py3/patches/05-change-import-protobufs-query.patch
@@ -36,7 +36,7 @@
class BaseQueryTxMode(IToProto):
--- contrib/python/ydb/py3/ydb/draft/_apis.py (index)
+++ contrib/python/ydb/py3/ydb/draft/_apis.py (working tree)
-@@ -1,21 +1,20 @@
+@@ -1,28 +1,22 @@
# -*- coding: utf-8 -*-
import typing
@@ -59,8 +59,17 @@
ydb_dynamic_config_v1_pb2_grpc,
)
-- from .._grpc.common.draft.protos import (
+- try:
+- from .._grpc.common.draft.protos import (
+- ydb_dynamic_config_pb2,
+- )
+- except ImportError:
+- from .._grpc.common.protos.draft import (
+- ydb_dynamic_config_pb2,
+- )
+ from contrib.ydb.public.api.protos.draft import (
- ydb_dynamic_config_pb2,
- )
++ ydb_dynamic_config_pb2,
++ )
+
+ ydb_dynamic_config = ydb_dynamic_config_pb2
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index 5d5e6636e39..a643d0b0b40 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.18.1)
+VERSION(3.18.10)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
index 0b5ec41df7e..24f3db30bc4 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
@@ -69,6 +69,10 @@ class QueryOnlineReadOnly(BaseQueryTxMode):
def name(self):
return self._name
+ def with_allow_inconsistent_reads(self) -> "QueryOnlineReadOnly":
+ self.allow_inconsistent_reads = True
+ return self
+
def to_proto(self) -> ydb_query_pb2.OnlineModeSettings:
return ydb_query_pb2.OnlineModeSettings(allow_inconsistent_reads=self.allow_inconsistent_reads)
diff --git a/contrib/python/ydb/py3/ydb/_utilities.py b/contrib/python/ydb/py3/ydb/_utilities.py
index e89b0af315d..117c7407f24 100644
--- a/contrib/python/ydb/py3/ydb/_utilities.py
+++ b/contrib/python/ydb/py3/ydb/_utilities.py
@@ -182,3 +182,21 @@ class AtomicCounter:
with self._lock:
self._value += 1
return self._value
+
+
+def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout: int):
+ waiter = future()
+
+ def get_first_response(waiter):
+ first_response = next(status_stream)
+ waiter.set_result(first_response)
+
+ thread = threading.Thread(
+ target=get_first_response,
+ args=(waiter,),
+ name="first response attach stream thread",
+ daemon=True,
+ )
+ thread.start()
+
+ return waiter.result(timeout=timeout)
diff --git a/contrib/python/ydb/py3/ydb/aio/_utilities.py b/contrib/python/ydb/py3/ydb/aio/_utilities.py
index 454378b0d40..5bd0f1a0471 100644
--- a/contrib/python/ydb/py3/ydb/aio/_utilities.py
+++ b/contrib/python/ydb/py3/ydb/aio/_utilities.py
@@ -1,3 +1,6 @@
+import asyncio
+
+
class AsyncResponseIterator(object):
def __init__(self, it, wrapper):
self.it = it.__aiter__()
@@ -21,3 +24,10 @@ class AsyncResponseIterator(object):
async def __anext__(self):
return await self._next()
+
+
+async def get_first_message_with_timeout(stream: AsyncResponseIterator, timeout: int):
+ async def get_first_response():
+ return await stream.next()
+
+ return await asyncio.wait_for(get_first_response(), timeout)
diff --git a/contrib/python/ydb/py3/ydb/aio/driver.py b/contrib/python/ydb/py3/ydb/aio/driver.py
index 0f4f3630f96..9cd6fd2b74d 100644
--- a/contrib/python/ydb/py3/ydb/aio/driver.py
+++ b/contrib/python/ydb/py3/ydb/aio/driver.py
@@ -59,3 +59,7 @@ class Driver(pool.ConnectionPool):
self.scheme_client = scheme.SchemeClient(self)
self.table_client = table.TableClient(self, config.table_client_settings)
self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings)
+
+ async def stop(self, timeout=10):
+ await self.table_client._stop_pool_if_needed(timeout=timeout)
+ await super().stop(timeout=timeout)
diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py
index 6d116600c98..456896dbb50 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py
@@ -13,6 +13,7 @@ from ...retries import (
RetrySettings,
retry_operation_async,
)
+from ...query.base import QueryClientSettings
from ... import convert
from ..._grpc.grpcwrapper import common_utils
@@ -22,10 +23,18 @@ logger = logging.getLogger(__name__)
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
- def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
+ def __init__(
+ self,
+ driver: common_utils.SupportedDriverType,
+ size: int = 100,
+ *,
+ query_client_settings: Optional[QueryClientSettings] = None,
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ ):
"""
:param driver: A driver instance
:param size: Size of session pool
+ :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
"""
self._driver = driver
@@ -34,10 +43,11 @@ class QuerySessionPool:
self._queue = asyncio.Queue()
self._current_size = 0
self._waiters = 0
- self._loop = asyncio.get_running_loop()
+ self._loop = asyncio.get_running_loop() if loop is None else loop
+ self._query_client_settings = query_client_settings
async def _create_new_session(self):
- session = QuerySession(self._driver)
+ session = QuerySession(self._driver, settings=self._query_client_settings)
await session.create()
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
return session
diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py
index 779eb3f0235..0561de8c391 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/session.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/session.py
@@ -15,6 +15,7 @@ from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
from ...query import base
from ...query.session import (
BaseQuerySession,
+ DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
QuerySessionStateEnum,
)
@@ -43,9 +44,17 @@ class QuerySession(BaseQuerySession):
lambda response: common_utils.ServerStatus.from_proto(response),
)
- first_response = await self._status_stream.next()
- if first_response.status != issues.StatusCode.SUCCESS:
- pass
+ try:
+ first_response = await _utilities.get_first_message_with_timeout(
+ self._status_stream,
+ DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
+ )
+ if first_response.status != issues.StatusCode.SUCCESS:
+ raise RuntimeError("Failed to attach session")
+ except Exception as e:
+ self._state.reset()
+ self._status_stream.cancel()
+ raise e
self._state.set_attached(True)
self._state._change_state(QuerySessionStateEnum.CREATED)
diff --git a/contrib/python/ydb/py3/ydb/aio/table.py b/contrib/python/ydb/py3/ydb/aio/table.py
index 3c25f7d20ac..aec32e1a26e 100644
--- a/contrib/python/ydb/py3/ydb/aio/table.py
+++ b/contrib/python/ydb/py3/ydb/aio/table.py
@@ -3,6 +3,14 @@ import logging
import time
import typing
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+)
+
import ydb
from ydb import issues, settings as settings_impl, table
@@ -13,6 +21,7 @@ from ydb.table import (
_scan_query_request_factory,
_wrap_scan_query_response,
BaseTxContext,
+ TableDescription,
)
from . import _utilities
from ydb import _apis, _session_impl
@@ -139,6 +148,18 @@ class Session(BaseSession):
class TableClient(BaseTableClient):
+ def __init__(self, driver, table_client_settings=None):
+ # type:(ydb.Driver, ydb.TableClientSettings) -> None
+ super().__init__(driver=driver, table_client_settings=table_client_settings)
+ self._pool: Optional[SessionPool] = None
+
+ def __del__(self):
+ if self._pool is not None and not self._pool._terminating:
+ try:
+ asyncio.get_running_loop().create_task(self._stop_pool_if_needed())
+ except Exception:
+ pass
+
def session(self):
return Session(self._driver, self._table_client_settings)
@@ -158,6 +179,214 @@ class TableClient(BaseTableClient):
lambda resp: _wrap_scan_query_response(resp, self._table_client_settings),
)
+ def _init_pool_if_needed(self):
+ if self._pool is None:
+ self._pool = SessionPool(self._driver, 10)
+
+ async def _stop_pool_if_needed(self, timeout=10):
+ if self._pool is not None and not self._pool._terminating:
+ await self._pool.stop(timeout=timeout)
+ self._pool = None
+
+ async def create_table(
+ self,
+ path: str,
+ table_description: "TableDescription",
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Create a YDB table.
+
+ :param path: A table path
+ :param table_description: TableDescription instanse.
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.create_table(path=path, table_description=table_description, settings=settings)
+
+ return await self._pool.retry_operation(callee)
+
+ async def drop_table(
+ self,
+ path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Drop a YDB table.
+
+ :param path: A table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.drop_table(path=path, settings=settings)
+
+ return await self._pool.retry_operation(callee)
+
+ async def alter_table(
+ self,
+ path: str,
+ add_columns: Optional[List["ydb.Column"]] = None,
+ drop_columns: Optional[List[str]] = None,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ alter_attributes: Optional[Optional[Dict[str, str]]] = None,
+ add_indexes: Optional[List["ydb.TableIndex"]] = None,
+ drop_indexes: Optional[List[str]] = None,
+ set_ttl_settings: Optional["ydb.TtlSettings"] = None,
+ drop_ttl_settings: Optional[Any] = None,
+ add_column_families: Optional[List["ydb.ColumnFamily"]] = None,
+ alter_column_families: Optional[List["ydb.ColumnFamily"]] = None,
+ alter_storage_settings: Optional["ydb.StorageSettings"] = None,
+ set_compaction_policy: Optional[str] = None,
+ alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None,
+ set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None,
+ set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Alter a YDB table.
+
+ :param path: A table path
+ :param add_columns: List of ydb.Column to add
+ :param drop_columns: List of column names to drop
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+ :param alter_attributes: Dict of attributes to alter
+ :param add_indexes: List of ydb.TableIndex to add
+ :param drop_indexes: List of index names to drop
+ :param set_ttl_settings: ydb.TtlSettings to set
+ :param drop_ttl_settings: Any to drop
+ :param add_column_families: List of ydb.ColumnFamily to add
+ :param alter_column_families: List of ydb.ColumnFamily to alter
+ :param alter_storage_settings: ydb.StorageSettings to alter
+ :param set_compaction_policy: Compaction policy
+ :param alter_partitioning_settings: ydb.PartitioningSettings to alter
+ :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.alter_table(
+ path=path,
+ add_columns=add_columns,
+ drop_columns=drop_columns,
+ settings=settings,
+ alter_attributes=alter_attributes,
+ add_indexes=add_indexes,
+ drop_indexes=drop_indexes,
+ set_ttl_settings=set_ttl_settings,
+ drop_ttl_settings=drop_ttl_settings,
+ add_column_families=add_column_families,
+ alter_column_families=alter_column_families,
+ alter_storage_settings=alter_storage_settings,
+ set_compaction_policy=set_compaction_policy,
+ alter_partitioning_settings=alter_partitioning_settings,
+ set_key_bloom_filter=set_key_bloom_filter,
+ set_read_replicas_settings=set_read_replicas_settings,
+ )
+
+ return await self._pool.retry_operation(callee)
+
+ async def describe_table(
+ self,
+ path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.TableSchemeEntry":
+ """
+ Describe a YDB table.
+
+ :param path: A table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: TableSchemeEntry or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.describe_table(path=path, settings=settings)
+
+ return await self._pool.retry_operation(callee)
+
+ async def copy_table(
+ self,
+ source_path: str,
+ destination_path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Copy a YDB table.
+
+ :param source_path: A table path
+ :param destination_path: Destination table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.copy_table(
+ source_path=source_path,
+ destination_path=destination_path,
+ settings=settings,
+ )
+
+ return await self._pool.retry_operation(callee)
+
+ async def copy_tables(
+ self,
+ source_destination_pairs: List[Tuple[str, str]],
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Copy a YDB tables.
+
+ :param source_destination_pairs: List of tuples (source_path, destination_path)
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings)
+
+ return await self._pool.retry_operation(callee)
+
+ async def rename_tables(
+ self,
+ rename_items: List[Tuple[str, str]],
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Rename a YDB tables.
+
+ :param rename_items: List of tuples (current_name, desired_name)
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ async def callee(session: Session):
+ return await session.rename_tables(rename_items=rename_items, settings=settings)
+
+ return await self._pool.retry_operation(callee)
+
class TxContext(BaseTxContext):
async def __aenter__(self):
diff --git a/contrib/python/ydb/py3/ydb/auth_helpers.py b/contrib/python/ydb/py3/ydb/auth_helpers.py
index 6399c3cfdf0..abf7331af6e 100644
--- a/contrib/python/ydb/py3/ydb/auth_helpers.py
+++ b/contrib/python/ydb/py3/ydb/auth_helpers.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import os
+from typing import Optional
def read_bytes(f):
@@ -7,8 +8,8 @@ def read_bytes(f):
return fr.read()
-def load_ydb_root_certificate():
- path = os.getenv("YDB_SSL_ROOT_CERTIFICATES_FILE", None)
+def load_ydb_root_certificate(path: Optional[str] = None):
+ path = path if path is not None else os.getenv("YDB_SSL_ROOT_CERTIFICATES_FILE", None)
if path is not None and os.path.exists(path):
return read_bytes(path)
return None
diff --git a/contrib/python/ydb/py3/ydb/driver.py b/contrib/python/ydb/py3/ydb/driver.py
index 1559b0d00bc..49bd223c901 100644
--- a/contrib/python/ydb/py3/ydb/driver.py
+++ b/contrib/python/ydb/py3/ydb/driver.py
@@ -89,6 +89,7 @@ class DriverConfig(object):
"secure_channel",
"table_client_settings",
"topic_client_settings",
+ "query_client_settings",
"endpoints",
"primary_user_agent",
"tracer",
@@ -112,6 +113,7 @@ class DriverConfig(object):
grpc_keep_alive_timeout=None,
table_client_settings=None,
topic_client_settings=None,
+ query_client_settings=None,
endpoints=None,
primary_user_agent="python-library",
tracer=None,
@@ -159,6 +161,7 @@ class DriverConfig(object):
self.grpc_keep_alive_timeout = grpc_keep_alive_timeout
self.table_client_settings = table_client_settings
self.topic_client_settings = topic_client_settings
+ self.query_client_settings = query_client_settings
self.primary_user_agent = primary_user_agent
self.tracer = tracer if tracer is not None else tracing.Tracer(None)
self.grpc_lb_policy_name = grpc_lb_policy_name
@@ -282,3 +285,7 @@ class Driver(pool.ConnectionPool):
self.scheme_client = scheme.SchemeClient(self)
self.table_client = table.TableClient(self, driver_config.table_client_settings)
self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings)
+
+ def stop(self, timeout=10):
+ self.table_client._stop_pool_if_needed(timeout=timeout)
+ super().stop(timeout=timeout)
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index 4c51a971ced..f1fcd17360e 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -8,6 +8,7 @@ import time
import threading
import queue
+from .base import QueryClientSettings
from .session import (
QuerySession,
)
@@ -27,10 +28,17 @@ logger = logging.getLogger(__name__)
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
- def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
+ def __init__(
+ self,
+ driver: common_utils.SupportedDriverType,
+ size: int = 100,
+ *,
+ query_client_settings: Optional[QueryClientSettings] = None,
+ ):
"""
:param driver: A driver instance.
:param size: Max size of Session Pool.
+ :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
"""
self._driver = driver
@@ -39,9 +47,10 @@ class QuerySessionPool:
self._size = size
self._should_stop = threading.Event()
self._lock = threading.RLock()
+ self._query_client_settings = query_client_settings
def _create_new_session(self, timeout: Optional[float]):
- session = QuerySession(self._driver)
+ session = QuerySession(self._driver, settings=self._query_client_settings)
session.create(settings=BaseRequestSettings().with_timeout(timeout))
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
return session
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
index e13540d357b..382c922d5e6 100644
--- a/contrib/python/ydb/py3/ydb/query/session.py
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -22,6 +22,10 @@ from .transaction import QueryTxContext
logger = logging.getLogger(__name__)
+DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
+DEFAULT_ATTACH_LONG_TIMEOUT = 31536000 # year
+
+
class QuerySessionStateEnum(enum.Enum):
NOT_INITIALIZED = "NOT_INITIALIZED"
CREATED = "CREATED"
@@ -134,8 +138,25 @@ class BaseQuerySession:
def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
self._driver = driver
- self._settings = settings if settings is not None else base.QueryClientSettings()
+ self._settings = self._get_client_settings(driver, settings)
self._state = QuerySessionState(settings)
+ self._attach_settings: BaseRequestSettings = (
+ BaseRequestSettings()
+ .with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
+ .with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT)
+ .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
+ )
+
+ def _get_client_settings(
+ self,
+ driver: common_utils.SupportedDriverType,
+ settings: Optional[base.QueryClientSettings] = None,
+ ) -> base.QueryClientSettings:
+ if settings is not None:
+ return settings
+ if driver._driver_config.query_client_settings is not None:
+ return driver._driver_config.query_client_settings
+ return base.QueryClientSettings()
def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
return self._driver(
@@ -157,12 +178,12 @@ class BaseQuerySession:
settings=settings,
)
- def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]:
+ def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
return self._driver(
_apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
_apis.QueryService.Stub,
_apis.QueryService.AttachSession,
- settings=settings,
+ settings=self._attach_settings,
)
def _execute_call(
@@ -202,16 +223,24 @@ class QuerySession(BaseQuerySession):
_stream = None
- def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None:
- self._stream = self._attach_call(settings=settings)
+ def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None:
+ self._stream = self._attach_call()
status_stream = _utilities.SyncResponseIterator(
self._stream,
lambda response: common_utils.ServerStatus.from_proto(response),
)
- first_response = next(status_stream)
- if first_response.status != issues.StatusCode.SUCCESS:
- pass
+ try:
+ first_response = _utilities.get_first_message_with_timeout(
+ status_stream,
+ first_resp_timeout,
+ )
+ if first_response.status != issues.StatusCode.SUCCESS:
+ raise RuntimeError("Failed to attach session")
+ except Exception as e:
+ self._state.reset()
+ status_stream.cancel()
+ raise e
self._state.set_attached(True)
self._state._change_state(QuerySessionStateEnum.CREATED)
@@ -219,7 +248,7 @@ class QuerySession(BaseQuerySession):
threading.Thread(
target=self._check_session_status_loop,
args=(status_stream,),
- name="check session status thread",
+ name="attach stream thread",
daemon=True,
).start()
diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py
index 01f5e52b6ae..945e9187677 100644
--- a/contrib/python/ydb/py3/ydb/table.py
+++ b/contrib/python/ydb/py3/ydb/table.py
@@ -4,6 +4,15 @@ import ydb
from abc import abstractmethod
import logging
import enum
+import typing
+
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+)
from . import (
issues,
@@ -1193,6 +1202,14 @@ class BaseTableClient(ITableClient):
class TableClient(BaseTableClient):
+ def __init__(self, driver, table_client_settings=None):
+ # type:(ydb.Driver, ydb.TableClientSettings) -> None
+ super().__init__(driver=driver, table_client_settings=table_client_settings)
+ self._pool: Optional[SessionPool] = None
+
+ def __del__(self):
+ self._stop_pool_if_needed()
+
def async_scan_query(self, query, parameters=None, settings=None):
# type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator
request = _scan_query_request_factory(query, parameters, settings)
@@ -1219,6 +1236,213 @@ class TableClient(BaseTableClient):
(),
)
+ def _init_pool_if_needed(self):
+ if self._pool is None:
+ self._pool = SessionPool(self._driver, 10)
+
+ def _stop_pool_if_needed(self, timeout=10):
+ if self._pool is not None:
+ self._pool.stop(timeout=timeout)
+
+ def create_table(
+ self,
+ path: str,
+ table_description: "TableDescription",
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Create a YDB table.
+
+ :param path: A table path
+ :param table_description: TableDescription instanse.
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.create_table(path=path, table_description=table_description, settings=settings)
+
+ return self._pool.retry_operation_sync(callee)
+
+ def drop_table(
+ self,
+ path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Drop a YDB table.
+
+ :param path: A table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.drop_table(path=path, settings=settings)
+
+ return self._pool.retry_operation_sync(callee)
+
+ def alter_table(
+ self,
+ path: str,
+ add_columns: Optional[List["ydb.Column"]] = None,
+ drop_columns: Optional[List[str]] = None,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ alter_attributes: Optional[Optional[Dict[str, str]]] = None,
+ add_indexes: Optional[List["ydb.TableIndex"]] = None,
+ drop_indexes: Optional[List[str]] = None,
+ set_ttl_settings: Optional["ydb.TtlSettings"] = None,
+ drop_ttl_settings: Optional[Any] = None,
+ add_column_families: Optional[List["ydb.ColumnFamily"]] = None,
+ alter_column_families: Optional[List["ydb.ColumnFamily"]] = None,
+ alter_storage_settings: Optional["ydb.StorageSettings"] = None,
+ set_compaction_policy: Optional[str] = None,
+ alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None,
+ set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None,
+ set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Alter a YDB table.
+
+ :param path: A table path
+ :param add_columns: List of ydb.Column to add
+ :param drop_columns: List of column names to drop
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+ :param alter_attributes: Dict of attributes to alter
+ :param add_indexes: List of ydb.TableIndex to add
+ :param drop_indexes: List of index names to drop
+ :param set_ttl_settings: ydb.TtlSettings to set
+ :param drop_ttl_settings: Any to drop
+ :param add_column_families: List of ydb.ColumnFamily to add
+ :param alter_column_families: List of ydb.ColumnFamily to alter
+ :param alter_storage_settings: ydb.StorageSettings to alter
+ :param set_compaction_policy: Compaction policy
+ :param alter_partitioning_settings: ydb.PartitioningSettings to alter
+ :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.alter_table(
+ path=path,
+ add_columns=add_columns,
+ drop_columns=drop_columns,
+ settings=settings,
+ alter_attributes=alter_attributes,
+ add_indexes=add_indexes,
+ drop_indexes=drop_indexes,
+ set_ttl_settings=set_ttl_settings,
+ drop_ttl_settings=drop_ttl_settings,
+ add_column_families=add_column_families,
+ alter_column_families=alter_column_families,
+ alter_storage_settings=alter_storage_settings,
+ set_compaction_policy=set_compaction_policy,
+ alter_partitioning_settings=alter_partitioning_settings,
+ set_key_bloom_filter=set_key_bloom_filter,
+ set_read_replicas_settings=set_read_replicas_settings,
+ )
+
+ return self._pool.retry_operation_sync(callee)
+
+ def describe_table(
+ self,
+ path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.TableSchemeEntry":
+ """
+ Describe a YDB table.
+
+ :param path: A table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: TableSchemeEntry or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.describe_table(path=path, settings=settings)
+
+ return self._pool.retry_operation_sync(callee)
+
+ def copy_table(
+ self,
+ source_path: str,
+ destination_path: str,
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Copy a YDB table.
+
+ :param source_path: A table path
+ :param destination_path: Destination table path
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.copy_table(
+ source_path=source_path,
+ destination_path=destination_path,
+ settings=settings,
+ )
+
+ return self._pool.retry_operation_sync(callee)
+
+ def copy_tables(
+ self,
+ source_destination_pairs: List[Tuple[str, str]],
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Copy a YDB tables.
+
+ :param source_destination_pairs: List of tuples (source_path, destination_path)
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings)
+
+ return self._pool.retry_operation_sync(callee)
+
+ def rename_tables(
+ self,
+ rename_items: List[Tuple[str, str]],
+ settings: Optional["settings_impl.BaseRequestSettings"] = None,
+ ) -> "ydb.Operation":
+ """
+ Rename a YDB tables.
+
+ :param rename_items: List of tuples (current_name, desired_name)
+ :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
+
+ :return: Operation or YDB error otherwise.
+ """
+
+ self._init_pool_if_needed()
+
+ def callee(session: Session):
+ return session.rename_tables(rename_items=rename_items, settings=settings)
+
+ return self._pool.retry_operation_sync(callee)
+
def _make_index_description(index):
result = TableIndex(index.name).with_index_columns(*tuple(col for col in index.index_columns))
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index f890bc7248e..6d13618e64c 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.18.1"
+VERSION = "3.18.10"