aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-09-11 19:05:28 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-09-11 19:15:44 +0300
commite0943acf1f045ab8975b567b099ab7152b063478 (patch)
tree43d801f7f8be2660cffffeef3a70b23d3c89bc99
parente6e46da89d32d38a3e8755812c6b8b4288c0cb59 (diff)
downloadydb-e0943acf1f045ab8975b567b099ab7152b063478.tar.gz
Intermediate changes
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA8
-rw-r--r--contrib/python/ydb/py3/README.md6
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py25
-rw-r--r--contrib/python/ydb/py3/ydb/aio/__init__.py2
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/__init__.py10
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/pool.py112
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/session.py25
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/transaction.py8
-rw-r--r--contrib/python/ydb/py3/ydb/query/__init__.py14
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py31
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py131
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py40
-rw-r--r--contrib/python/ydb/py3/ydb/query/transaction.py6
-rw-r--r--contrib/python/ydb/py3/ydb/settings.py20
-rw-r--r--contrib/python/ydb/py3/ydb/table.py4
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
17 files changed, 353 insertions, 93 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 4397bad877..db2f0036b3 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.16.1
+Version: 3.17.1
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
@@ -30,6 +30,12 @@ YDB Python SDK
Officially supported Python client for YDB.
+---
+
+**Documentation**: <a href="https://ydb-platform.github.io/ydb-python-sdk" target="_blank">https://ydb-platform.github.io/ydb-python-sdk</a>
+
+---
+
## Quickstart
### Prerequisites
diff --git a/contrib/python/ydb/py3/README.md b/contrib/python/ydb/py3/README.md
index cfc57eb276..db7c3de271 100644
--- a/contrib/python/ydb/py3/README.md
+++ b/contrib/python/ydb/py3/README.md
@@ -7,6 +7,12 @@ YDB Python SDK
Officially supported Python client for YDB.
+---
+
+**Documentation**: <a href="https://ydb-platform.github.io/ydb-python-sdk" target="_blank">https://ydb-platform.github.io/ydb-python-sdk</a>
+
+---
+
## Quickstart
### Prerequisites
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index 8c6877ee58..c1ab6d4472 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.16.1)
+VERSION(3.17.1)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
index 3ef2d55430..0b5ec41df7 100644
--- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
+++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py
@@ -10,6 +10,8 @@ except ImportError:
class BaseQueryTxMode(IToProto):
+ """Abstract class for Query Transaction Modes."""
+
@property
@abc.abstractmethod
def name(self) -> str:
@@ -17,6 +19,11 @@ class BaseQueryTxMode(IToProto):
class QuerySnapshotReadOnly(BaseQueryTxMode):
+ """All the read operations within a transaction access the database snapshot.
+ All the data reads are consistent. The snapshot is taken when the transaction begins,
+ meaning the transaction sees all changes committed before it began.
+ """
+
def __init__(self):
self._name = "snapshot_read_only"
@@ -29,6 +36,10 @@ class QuerySnapshotReadOnly(BaseQueryTxMode):
class QuerySerializableReadWrite(BaseQueryTxMode):
+ """This mode guarantees that the result of successful parallel transactions is equivalent
+ to their serial execution, and there are no read anomalies for successful transactions.
+ """
+
def __init__(self):
self._name = "serializable_read_write"
@@ -41,6 +52,15 @@ class QuerySerializableReadWrite(BaseQueryTxMode):
class QueryOnlineReadOnly(BaseQueryTxMode):
+ """Each read operation in the transaction is reading the data that is most recent at execution time.
+ The consistency of retrieved data depends on the allow_inconsistent_reads setting:
+ * false (consistent reads): Each individual read operation returns consistent data,
+ but no consistency is guaranteed between reads.
+ Reading the same table range twice may return different results.
+ * true (inconsistent reads): Even the data fetched by a particular
+ read operation may contain inconsistent results.
+ """
+
def __init__(self, allow_inconsistent_reads: bool = False):
self.allow_inconsistent_reads = allow_inconsistent_reads
self._name = "online_read_only"
@@ -54,6 +74,11 @@ class QueryOnlineReadOnly(BaseQueryTxMode):
class QueryStaleReadOnly(BaseQueryTxMode):
+ """Read operations within a transaction may return results that are slightly out-of-date
+ (lagging by fractions of a second). Each individual read returns consistent data,
+ but no consistency between different reads is guaranteed.
+ """
+
def __init__(self):
self._name = "stale_read_only"
diff --git a/contrib/python/ydb/py3/ydb/aio/__init__.py b/contrib/python/ydb/py3/ydb/aio/__init__.py
index 0e7d4e747a..1c9c887c22 100644
--- a/contrib/python/ydb/py3/ydb/aio/__init__.py
+++ b/contrib/python/ydb/py3/ydb/aio/__init__.py
@@ -1,3 +1,3 @@
from .driver import Driver # noqa
from .table import SessionPool, retry_operation # noqa
-from .query import QuerySessionPoolAsync, QuerySessionAsync # noqa
+from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa
diff --git a/contrib/python/ydb/py3/ydb/aio/query/__init__.py b/contrib/python/ydb/py3/ydb/aio/query/__init__.py
index 829d7b54cf..8e7dd4fdff 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/__init__.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/__init__.py
@@ -1,7 +1,9 @@
__all__ = [
- "QuerySessionPoolAsync",
- "QuerySessionAsync",
+ "QuerySessionPool",
+ "QuerySession",
+ "QueryTxContext",
]
-from .pool import QuerySessionPoolAsync
-from .session import QuerySessionAsync
+from .pool import QuerySessionPool
+from .session import QuerySession
+from .transaction import QueryTxContext
diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py
index f91f7465e4..e8d53438fc 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py
@@ -1,3 +1,4 @@
+import asyncio
import logging
from typing import (
Callable,
@@ -6,7 +7,7 @@ from typing import (
)
from .session import (
- QuerySessionAsync,
+ QuerySession,
)
from ...retries import (
RetrySettings,
@@ -18,20 +19,85 @@ from ..._grpc.grpcwrapper import common_utils
logger = logging.getLogger(__name__)
-class QuerySessionPoolAsync:
- """QuerySessionPoolAsync is an object to simplify operations with sessions of Query Service."""
+class QuerySessionPool:
+ """QuerySessionPool is an object to simplify operations with sessions of Query Service."""
- def __init__(self, driver: common_utils.SupportedDriverType):
+ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
"""
:param driver: A driver instance
+ :param size: Size of session pool
"""
- logger.warning("QuerySessionPoolAsync is an experimental API, which could be changed.")
+ logger.warning("QuerySessionPool is an experimental API, which could be changed.")
self._driver = driver
+ self._size = size
+ self._should_stop = asyncio.Event()
+ self._queue = asyncio.Queue()
+ self._current_size = 0
+ self._waiters = 0
+ self._loop = asyncio.get_running_loop()
+
+ async def _create_new_session(self):
+ session = QuerySession(self._driver)
+ await session.create()
+ logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
+ return session
+
+ async def acquire(self) -> QuerySession:
+ """WARNING: This API is experimental and could be changed.
+
+ Acquire a session from Session Pool.
+
+ :return A QuerySession object.
+ """
+
+ if self._should_stop.is_set():
+ logger.error("An attempt to take session from closed session pool.")
+ raise RuntimeError("An attempt to take session from closed session pool.")
+
+ session = None
+ try:
+ session = self._queue.get_nowait()
+ except asyncio.QueueEmpty:
+ pass
+
+ if session is None and self._current_size == self._size:
+ queue_get = asyncio.ensure_future(self._queue.get())
+ task_stop = asyncio.ensure_future(asyncio.ensure_future(self._should_stop.wait()))
+ done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
+ if task_stop in done:
+ queue_get.cancel()
+ raise RuntimeError("An attempt to take session from closed session pool.")
+
+ task_stop.cancel()
+ session = queue_get.result()
+
+ if session is not None:
+ if session._state.attached:
+ logger.debug(f"Acquired active session from queue: {session._state.session_id}")
+ return session
+ else:
+ self._current_size -= 1
+ logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
+
+ logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
+ session = await self._create_new_session()
+ self._current_size += 1
+ return session
+
+ async def release(self, session: QuerySession) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Release a session back to Session Pool.
+ """
+
+ self._queue.put_nowait(session)
+ logger.debug("Session returned to queue: %s", session._state.session_id)
def checkout(self) -> "SimpleQuerySessionCheckoutAsync":
"""WARNING: This API is experimental and could be changed.
- Return a Session context manager, that opens session on enter and closes session on exit.
+
+ Return a Session context manager, that acquires session on enter and releases session on exit.
"""
return SimpleQuerySessionCheckoutAsync(self)
@@ -40,6 +106,7 @@ class QuerySessionPoolAsync:
self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
):
"""WARNING: This API is experimental and could be changed.
+
Special interface to execute a bunch of commands with session in a safe, retriable way.
:param callee: A function, that works with session.
@@ -65,6 +132,7 @@ class QuerySessionPoolAsync:
**kwargs,
) -> List[convert.ResultSet]:
"""WARNING: This API is experimental and could be changed.
+
Special interface to execute a one-shot queries in a safe, retriable way.
Note: this method loads all data from stream before return, do not use this
method with huge read queries.
@@ -85,8 +153,20 @@ class QuerySessionPoolAsync:
return await retry_operation_async(wrapped_callee, retry_settings)
- async def stop(self, timeout=None):
- pass # TODO: implement
+ async def stop(self):
+ self._should_stop.set()
+
+ tasks = []
+ while True:
+ try:
+ session = self._queue.get_nowait()
+ tasks.append(session.delete())
+ except asyncio.QueueEmpty:
+ break
+
+ await asyncio.gather(*tasks)
+
+ logger.debug("All session were deleted.")
async def __aenter__(self):
return self
@@ -94,15 +174,21 @@ class QuerySessionPoolAsync:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
+ def __del__(self):
+ if self._should_stop.is_set() or self._loop.is_closed():
+ return
+
+ self._loop.call_soon(self.stop)
+
class SimpleQuerySessionCheckoutAsync:
- def __init__(self, pool: QuerySessionPoolAsync):
+ def __init__(self, pool: QuerySessionPool):
self._pool = pool
- self._session = QuerySessionAsync(pool._driver)
+ self._session = None
- async def __aenter__(self) -> QuerySessionAsync:
- await self._session.create()
+ async def __aenter__(self) -> QuerySession:
+ self._session = await self._pool.acquire()
return self._session
async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self._session.delete()
+ await self._pool.release(self._session)
diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py
index 627a41d895..4c1c1a10fc 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/session.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/session.py
@@ -5,9 +5,10 @@ from typing import (
)
from .base import AsyncResponseContextIterator
-from .transaction import QueryTxContextAsync
+from .transaction import QueryTxContext
from .. import _utilities
from ... import issues
+from ...settings import BaseRequestSettings
from ..._grpc.grpcwrapper import common_utils
from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
@@ -18,7 +19,7 @@ from ...query.session import (
)
-class QuerySessionAsync(BaseQuerySession):
+class QuerySession(BaseQuerySession):
"""Session object for Query Service. It is not recommended to control
session's lifecycle manually - use a QuerySessionPool is always a better choise.
"""
@@ -32,7 +33,7 @@ class QuerySessionAsync(BaseQuerySession):
settings: Optional[base.QueryClientSettings] = None,
loop: asyncio.AbstractEventLoop = None,
):
- super(QuerySessionAsync, self).__init__(driver, settings)
+ super(QuerySession, self).__init__(driver, settings)
self._loop = loop if loop is not None else asyncio.get_running_loop()
async def _attach(self) -> None:
@@ -62,7 +63,7 @@ class QuerySessionAsync(BaseQuerySession):
self._state.reset()
self._state._change_state(QuerySessionStateEnum.CLOSED)
- async def delete(self) -> None:
+ async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""WARNING: This API is experimental and could be changed.
Deletes a Session of Query Service on server side and releases resources.
@@ -73,30 +74,30 @@ class QuerySessionAsync(BaseQuerySession):
return
self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
- await self._delete_call()
+ await self._delete_call(settings=settings)
self._stream.cancel()
- async def create(self) -> "QuerySessionAsync":
+ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession":
"""WARNING: This API is experimental and could be changed.
Creates a Session of Query Service on server side and attaches it.
- :return: QuerySessionSync object.
+ :return: QuerySession object.
"""
if self._state._already_in(QuerySessionStateEnum.CREATED):
return
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
- await self._create_call()
+ await self._create_call(settings=settings)
await self._attach()
return self
- def transaction(self, tx_mode=None) -> QueryTxContextAsync:
+ def transaction(self, tx_mode=None) -> QueryTxContext:
self._state._check_session_ready_to_use()
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
- return QueryTxContextAsync(
+ return QueryTxContext(
self._driver,
self._state,
self,
@@ -110,10 +111,12 @@ class QuerySessionAsync(BaseQuerySession):
syntax: base.QuerySyntax = None,
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
+ settings: Optional[BaseRequestSettings] = None,
) -> AsyncResponseContextIterator:
"""WARNING: This API is experimental and could be changed.
Sends a query to Query Service
+
:param query: (YQL or SQL text) to be executed.
:param syntax: Syntax of the query, which is a one from the following choises:
1) QuerySyntax.YQL_V1, which is default;
@@ -132,6 +135,7 @@ class QuerySessionAsync(BaseQuerySession):
exec_mode=exec_mode,
parameters=parameters,
concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
)
return AsyncResponseContextIterator(
@@ -139,6 +143,7 @@ class QuerySessionAsync(BaseQuerySession):
lambda resp: base.wrap_execute_query_response(
rpc_state=None,
response_pb=resp,
+ session_state=self._state,
settings=self._settings,
),
)
diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
index 429ba125c8..b115a4b48b 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
@@ -15,8 +15,8 @@ from ...query.transaction import (
logger = logging.getLogger(__name__)
-class QueryTxContextAsync(BaseQueryTxContext):
- async def __aenter__(self) -> "QueryTxContextAsync":
+class QueryTxContext(BaseQueryTxContext):
+ async def __aenter__(self) -> "QueryTxContext":
"""
Enters a context manager and returns a transaction
@@ -47,7 +47,7 @@ class QueryTxContextAsync(BaseQueryTxContext):
pass
self._prev_stream = None
- async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextAsync":
+ async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContext":
"""WARNING: This API is experimental and could be changed.
Explicitly begins a transaction
@@ -114,6 +114,7 @@ class QueryTxContextAsync(BaseQueryTxContext):
"""WARNING: This API is experimental and could be changed.
Sends a query to Query Service
+
:param query: (YQL or SQL text) to be executed.
:param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
@@ -146,6 +147,7 @@ class QueryTxContextAsync(BaseQueryTxContext):
lambda resp: base.wrap_execute_query_response(
rpc_state=None,
response_pb=resp,
+ session_state=self._session_state,
tx=self,
commit_tx=commit_tx,
settings=self.session._settings,
diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py
index 40e512cd6b..0f8187892f 100644
--- a/contrib/python/ydb/py3/ydb/query/__init__.py
+++ b/contrib/python/ydb/py3/ydb/query/__init__.py
@@ -1,11 +1,13 @@
__all__ = [
+ "BaseQueryTxMode",
"QueryOnlineReadOnly",
"QuerySerializableReadWrite",
"QuerySnapshotReadOnly",
"QueryStaleReadOnly",
"QuerySessionPool",
- "QueryClientSync",
- "QuerySessionSync",
+ "QueryClientSettings",
+ "QuerySession",
+ "QueryTxContext",
]
import logging
@@ -14,10 +16,12 @@ from .base import (
QueryClientSettings,
)
-from .session import QuerySessionSync
+from .session import QuerySession
+from .transaction import QueryTxContext
from .._grpc.grpcwrapper import common_utils
from .._grpc.grpcwrapper.ydb_query_public_types import (
+ BaseQueryTxMode,
QueryOnlineReadOnly,
QuerySerializableReadWrite,
QuerySnapshotReadOnly,
@@ -35,5 +39,5 @@ class QueryClientSync:
self._driver = driver
self._settings = query_client_settings
- def session(self) -> QuerySessionSync:
- return QuerySessionSync(self._driver, self._settings)
+ def session(self) -> QuerySession:
+ return QuerySession(self._driver, self._settings)
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
index 55087d0c4e..9372cbcf54 100644
--- a/contrib/python/ydb/py3/ydb/query/base.py
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -165,28 +165,31 @@ def create_execute_query_request(
)
+def bad_session_handler(func):
+ @functools.wraps(func)
+ def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs):
+ try:
+ return func(rpc_state, response_pb, session_state, *args, **kwargs)
+ except issues.BadSession:
+ session_state.reset()
+ raise
+
+ return decorator
+
+
+@bad_session_handler
def wrap_execute_query_response(
rpc_state: RpcState,
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
+ session_state: IQuerySessionState,
tx: Optional["BaseQueryTxContext"] = None,
commit_tx: Optional[bool] = False,
settings: Optional[QueryClientSettings] = None,
) -> convert.ResultSet:
issues._process_response(response_pb)
- if tx and response_pb.tx_meta and not tx.tx_id:
- tx._move_to_beginned(response_pb.tx_meta.id)
if tx and commit_tx:
tx._move_to_commited()
- return convert.ResultSet.from_message(response_pb.result_set, settings)
-
-
-def bad_session_handler(func):
- @functools.wraps(func)
- def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs):
- try:
- return func(rpc_state, response_pb, session_state, *args, **kwargs)
- except issues.BadSession:
- session_state.reset()
- raise
+ elif tx and response_pb.tx_meta and not tx.tx_id:
+ tx._move_to_beginned(response_pb.tx_meta.id)
- return decorator
+ return convert.ResultSet.from_message(response_pb.result_set, settings)
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index afe39f0623..839d8688ca 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -4,15 +4,20 @@ from typing import (
Optional,
List,
)
+import time
+import threading
+import queue
from .session import (
- QuerySessionSync,
+ QuerySession,
)
from ..retries import (
RetrySettings,
retry_operation_sync,
)
+from .. import issues
from .. import convert
+from ..settings import BaseRequestSettings
from .._grpc.grpcwrapper import common_utils
@@ -22,23 +27,102 @@ logger = logging.getLogger(__name__)
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
- def __init__(self, driver: common_utils.SupportedDriverType):
+ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
"""
- :param driver: A driver instance
+ :param driver: A driver instance.
+ :param size: Max size of Session Pool.
"""
logger.warning("QuerySessionPool is an experimental API, which could be changed.")
self._driver = driver
+ self._queue = queue.Queue()
+ self._current_size = 0
+ self._size = size
+ self._should_stop = threading.Event()
+ self._lock = threading.RLock()
+
+ def _create_new_session(self, timeout: Optional[float]):
+ session = QuerySession(self._driver)
+ session.create(settings=BaseRequestSettings().with_timeout(timeout))
+ logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
+ return session
+
+ def acquire(self, timeout: Optional[float] = None) -> QuerySession:
+ """WARNING: This API is experimental and could be changed.
+
+ Acquire a session from Session Pool.
+
+ :param timeout: A timeout to wait in seconds.
+
+ :return A QuerySession object.
+ """
+
+ start = time.monotonic()
+
+ lock_acquire_timeout = timeout if timeout is not None else -1
+ acquired = self._lock.acquire(timeout=lock_acquire_timeout)
+ try:
+ if self._should_stop.is_set():
+ logger.error("An attempt to take session from closed session pool.")
+ raise RuntimeError("An attempt to take session from closed session pool.")
+
+ session = None
+ try:
+ session = self._queue.get_nowait()
+ except queue.Empty:
+ pass
+
+ finish = time.monotonic()
+ timeout = timeout - (finish - start) if timeout is not None else None
+
+ start = time.monotonic()
+ if session is None and self._current_size == self._size:
+ try:
+ session = self._queue.get(block=True, timeout=timeout)
+ except queue.Empty:
+ raise issues.SessionPoolEmpty("Timeout on acquire session")
+
+ if session is not None:
+ if session._state.attached:
+ logger.debug(f"Acquired active session from queue: {session._state.session_id}")
+ return session
+ else:
+ self._current_size -= 1
+ logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
+
+ logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
+ finish = time.monotonic()
+ time_left = timeout - (finish - start) if timeout is not None else None
+ session = self._create_new_session(time_left)
+
+ self._current_size += 1
+ return session
+ finally:
+ if acquired:
+ self._lock.release()
+
+ def release(self, session: QuerySession) -> None:
+ """WARNING: This API is experimental and could be changed.
+
+ Release a session back to Session Pool.
+ """
+
+ self._queue.put_nowait(session)
+ logger.debug("Session returned to queue: %s", session._state.session_id)
- def checkout(self) -> "SimpleQuerySessionCheckout":
+ def checkout(self, timeout: Optional[float] = None) -> "SimpleQuerySessionCheckout":
"""WARNING: This API is experimental and could be changed.
- Return a Session context manager, that opens session on enter and closes session on exit.
+
+ Return a Session context manager, that acquires session on enter and releases session on exit.
+
+ :param timeout: A timeout to wait in seconds.
"""
- return SimpleQuerySessionCheckout(self)
+ return SimpleQuerySessionCheckout(self, timeout)
def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs):
"""WARNING: This API is experimental and could be changed.
+
Special interface to execute a bunch of commands with session in a safe, retriable way.
:param callee: A function, that works with session.
@@ -50,7 +134,7 @@ class QuerySessionPool:
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
- with self.checkout() as session:
+ with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
return callee(session, *args, **kwargs)
return retry_operation_sync(wrapped_callee, retry_settings)
@@ -64,6 +148,7 @@ class QuerySessionPool:
**kwargs,
) -> List[convert.ResultSet]:
"""WARNING: This API is experimental and could be changed.
+
Special interface to execute a one-shot queries in a safe, retriable way.
Note: this method loads all data from stream before return, do not use this
method with huge read queries.
@@ -78,14 +163,28 @@ class QuerySessionPool:
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
- with self.checkout() as session:
+ with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
it = session.execute(query, parameters, *args, **kwargs)
return [result_set for result_set in it]
return retry_operation_sync(wrapped_callee, retry_settings)
def stop(self, timeout=None):
- pass # TODO: implement
+ acquire_timeout = timeout if timeout is not None else -1
+ acquired = self._lock.acquire(timeout=acquire_timeout)
+ try:
+ self._should_stop.set()
+ while True:
+ try:
+ session = self._queue.get_nowait()
+ session.delete()
+ except queue.Empty:
+ break
+
+ logger.debug("All session were deleted.")
+ finally:
+ if acquired:
+ self._lock.release()
def __enter__(self):
return self
@@ -93,15 +192,19 @@ class QuerySessionPool:
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
+ def __del__(self):
+ self.stop()
+
class SimpleQuerySessionCheckout:
- def __init__(self, pool: QuerySessionPool):
+ def __init__(self, pool: QuerySessionPool, timeout: Optional[float]):
self._pool = pool
- self._session = QuerySessionSync(pool._driver)
+ self._timeout = timeout
+ self._session = None
- def __enter__(self) -> QuerySessionSync:
- self._session.create()
+ def __enter__(self) -> QuerySession:
+ self._session = self._pool.acquire(self._timeout)
return self._session
def __exit__(self, exc_type, exc_val, exc_tb):
- self._session.delete()
+ self._pool.release(self._session)
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
index 4b051dc16f..5b4db26c92 100644
--- a/contrib/python/ydb/py3/ydb/query/session.py
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -10,12 +10,13 @@ from typing import (
from . import base
from .. import _apis, issues, _utilities
+from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState
from .._grpc.grpcwrapper import common_utils
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
-from .transaction import QueryTxContextSync
+from .transaction import QueryTxContext
logger = logging.getLogger(__name__)
@@ -136,29 +137,32 @@ class BaseQuerySession:
self._settings = settings if settings is not None else base.QueryClientSettings()
self._state = QuerySessionState(settings)
- def _create_call(self) -> "BaseQuerySession":
+ def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
return self._driver(
_apis.ydb_query.CreateSessionRequest(),
_apis.QueryService.Stub,
_apis.QueryService.CreateSession,
wrap_result=wrapper_create_session,
wrap_args=(self._state, self),
+ settings=settings,
)
- def _delete_call(self) -> "BaseQuerySession":
+ def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
return self._driver(
_apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id),
_apis.QueryService.Stub,
_apis.QueryService.DeleteSession,
wrap_result=wrapper_delete_session,
wrap_args=(self._state, self),
+ settings=settings,
)
- def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
+ def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]:
return self._driver(
_apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
_apis.QueryService.Stub,
_apis.QueryService.AttachSession,
+ settings=settings,
)
def _execute_call(
@@ -169,6 +173,7 @@ class BaseQuerySession:
exec_mode: base.QueryExecMode = None,
parameters: dict = None,
concurrent_result_sets: bool = False,
+ settings: Optional[BaseRequestSettings] = None,
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
request = base.create_execute_query_request(
query=query,
@@ -186,18 +191,19 @@ class BaseQuerySession:
request.to_proto(),
_apis.QueryService.Stub,
_apis.QueryService.ExecuteQuery,
+ settings=settings,
)
-class QuerySessionSync(BaseQuerySession):
+class QuerySession(BaseQuerySession):
"""Session object for Query Service. It is not recommended to control
session's lifecycle manually - use a QuerySessionPool is always a better choise.
"""
_stream = None
- def _attach(self) -> None:
- self._stream = self._attach_call()
+ def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None:
+ self._stream = self._attach_call(settings=settings)
status_stream = _utilities.SyncResponseIterator(
self._stream,
lambda response: common_utils.ServerStatus.from_proto(response),
@@ -228,7 +234,7 @@ class QuerySessionSync(BaseQuerySession):
self._state.reset()
self._state._change_state(QuerySessionStateEnum.CLOSED)
- def delete(self) -> None:
+ def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""WARNING: This API is experimental and could be changed.
Deletes a Session of Query Service on server side and releases resources.
@@ -239,29 +245,31 @@ class QuerySessionSync(BaseQuerySession):
return
self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
- self._delete_call()
+ self._delete_call(settings=settings)
self._stream.cancel()
- def create(self) -> "QuerySessionSync":
+ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession":
"""WARNING: This API is experimental and could be changed.
Creates a Session of Query Service on server side and attaches it.
- :return: QuerySessionSync object.
+ :return: QuerySession object.
"""
if self._state._already_in(QuerySessionStateEnum.CREATED):
return
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
- self._create_call()
+
+ self._create_call(settings=settings)
self._attach()
return self
- def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContextSync:
+ def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext:
"""WARNING: This API is experimental and could be changed.
Creates a transaction context manager with specified transaction mode.
+
:param tx_mode: Transaction mode, which is a one from the following choises:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
@@ -275,7 +283,7 @@ class QuerySessionSync(BaseQuerySession):
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
- return QueryTxContextSync(
+ return QueryTxContext(
self._driver,
self._state,
self,
@@ -289,10 +297,12 @@ class QuerySessionSync(BaseQuerySession):
syntax: base.QuerySyntax = None,
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
+ settings: Optional[BaseRequestSettings] = None,
) -> base.SyncResponseContextIterator:
"""WARNING: This API is experimental and could be changed.
Sends a query to Query Service
+
:param query: (YQL or SQL text) to be executed.
:param syntax: Syntax of the query, which is a one from the following choises:
1) QuerySyntax.YQL_V1, which is default;
@@ -311,6 +321,7 @@ class QuerySessionSync(BaseQuerySession):
exec_mode=exec_mode,
parameters=parameters,
concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
)
return base.SyncResponseContextIterator(
@@ -318,6 +329,7 @@ class QuerySessionSync(BaseQuerySession):
lambda resp: base.wrap_execute_query_response(
rpc_state=None,
response_pb=resp,
+ session_state=self._state,
settings=self._settings,
),
)
diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py
index be7396b1a5..21ba02798b 100644
--- a/contrib/python/ydb/py3/ydb/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/query/transaction.py
@@ -294,7 +294,7 @@ class BaseQueryTxContext:
self._tx_state._change_state(QueryTxStateEnum.COMMITTED)
-class QueryTxContextSync(BaseQueryTxContext):
+class QueryTxContext(BaseQueryTxContext):
def __enter__(self) -> "BaseQueryTxContext":
"""
Enters a context manager and returns a transaction
@@ -326,7 +326,7 @@ class QueryTxContextSync(BaseQueryTxContext):
pass
self._prev_stream = None
- def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextSync":
+ def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContext":
"""WARNING: This API is experimental and could be changed.
Explicitly begins a transaction
@@ -394,6 +394,7 @@ class QueryTxContextSync(BaseQueryTxContext):
"""WARNING: This API is experimental and could be changed.
Sends a query to Query Service
+
:param query: (YQL or SQL text) to be executed.
:param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
@@ -427,6 +428,7 @@ class QueryTxContextSync(BaseQueryTxContext):
lambda resp: base.wrap_execute_query_response(
rpc_state=None,
response_pb=resp,
+ session_state=self._session_state,
tx=self,
commit_tx=commit_tx,
settings=self.session._settings,
diff --git a/contrib/python/ydb/py3/ydb/settings.py b/contrib/python/ydb/py3/ydb/settings.py
index 6739a46fab..019b75a8ec 100644
--- a/contrib/python/ydb/py3/ydb/settings.py
+++ b/contrib/python/ydb/py3/ydb/settings.py
@@ -39,7 +39,7 @@ class BaseRequestSettings(object):
.with_need_rpc_auth(self.need_rpc_auth)
)
- def with_compression(self, compression):
+ def with_compression(self, compression) -> "BaseRequestSettings":
"""
Enables compression for the specific RPC
:param compression: An RPCCompression enum value.
@@ -48,11 +48,11 @@ class BaseRequestSettings(object):
self.compression = compression
return self
- def with_need_rpc_auth(self, need_rpc_auth):
+ def with_need_rpc_auth(self, need_rpc_auth) -> "BaseRequestSettings":
self.need_rpc_auth = need_rpc_auth
return self
- def with_header(self, key, value):
+ def with_header(self, key, value) -> "BaseRequestSettings":
"""
Adds a key-value pair to the request headers.
:param key: A string with a header key.
@@ -62,7 +62,7 @@ class BaseRequestSettings(object):
self.headers.append((key, value))
return self
- def with_trace_id(self, trace_id):
+ def with_trace_id(self, trace_id) -> "BaseRequestSettings":
"""
Includes trace id for RPC headers
:param trace_id: A trace id string
@@ -71,7 +71,7 @@ class BaseRequestSettings(object):
self.trace_id = trace_id
return self
- def with_request_type(self, request_type):
+ def with_request_type(self, request_type) -> "BaseRequestSettings":
"""
Includes request type for RPC headers
:param request_type: A request type string
@@ -80,7 +80,7 @@ class BaseRequestSettings(object):
self.request_type = request_type
return self
- def with_operation_timeout(self, timeout):
+ def with_operation_timeout(self, timeout) -> "BaseRequestSettings":
"""
Indicates that client is no longer interested in the result of operation after the specified duration
starting from the time operation arrives at the server.
@@ -89,12 +89,12 @@ class BaseRequestSettings(object):
Timeout of operation does not tell anything about its result, it might be completed successfully
or cancelled on server.
:param timeout:
- :return:
+ :return: The self instance
"""
self.operation_timeout = timeout
return self
- def with_cancel_after(self, timeout):
+ def with_cancel_after(self, timeout) -> "BaseRequestSettings":
"""
Server will try to cancel the operation after the specified duration starting from the time
the operation arrives at server.
@@ -102,12 +102,12 @@ class BaseRequestSettings(object):
sent back to client if it was waiting for the operation result.
In case when cancellation isn't possible, no action will be performed.
:param timeout:
- :return:
+ :return: The self instance
"""
self.cancel_after = timeout
return self
- def with_timeout(self, timeout):
+ def with_timeout(self, timeout) -> "BaseRequestSettings":
"""
Client-side timeout to complete request.
Since YDB doesn't support request cancellation at this moment, this feature should be
diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py
index ac9f93042c..cfcffb17af 100644
--- a/contrib/python/ydb/py3/ydb/table.py
+++ b/contrib/python/ydb/py3/ydb/table.py
@@ -297,6 +297,10 @@ class TableIndex(object):
self._pb.global_index.SetInParent()
return self
+ def with_global_async_index(self):
+ self._pb.global_async_index.SetInParent()
+ return self
+
def with_index_columns(self, *columns):
for column in columns:
self._pb.index_columns.append(column)
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index 96a1189198..b0ef9f368d 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.16.1"
+VERSION = "3.17.1"