diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-11 19:05:28 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-11 19:15:44 +0300 |
commit | e0943acf1f045ab8975b567b099ab7152b063478 (patch) | |
tree | 43d801f7f8be2660cffffeef3a70b23d3c89bc99 /contrib | |
parent | e6e46da89d32d38a3e8755812c6b8b4288c0cb59 (diff) | |
download | ydb-e0943acf1f045ab8975b567b099ab7152b063478.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 8 | ||||
-rw-r--r-- | contrib/python/ydb/py3/README.md | 6 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py | 25 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/__init__.py | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/__init__.py | 10 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/pool.py | 112 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/session.py | 25 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/aio/query/transaction.py | 8 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/__init__.py | 14 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/base.py | 31 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/pool.py | 131 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/session.py | 40 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/transaction.py | 6 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/settings.py | 20 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/table.py | 4 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
17 files changed, 353 insertions, 93 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 4397bad877..db2f0036b3 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.16.1 +Version: 3.17.1 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC @@ -30,6 +30,12 @@ YDB Python SDK Officially supported Python client for YDB. +--- + +**Documentation**: <a href="https://ydb-platform.github.io/ydb-python-sdk" target="_blank">https://ydb-platform.github.io/ydb-python-sdk</a> + +--- + ## Quickstart ### Prerequisites diff --git a/contrib/python/ydb/py3/README.md b/contrib/python/ydb/py3/README.md index cfc57eb276..db7c3de271 100644 --- a/contrib/python/ydb/py3/README.md +++ b/contrib/python/ydb/py3/README.md @@ -7,6 +7,12 @@ YDB Python SDK Officially supported Python client for YDB. +--- + +**Documentation**: <a href="https://ydb-platform.github.io/ydb-python-sdk" target="_blank">https://ydb-platform.github.io/ydb-python-sdk</a> + +--- + ## Quickstart ### Prerequisites diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index 8c6877ee58..c1ab6d4472 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.16.1) +VERSION(3.17.1) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py index 3ef2d55430..0b5ec41df7 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_query_public_types.py @@ -10,6 +10,8 @@ except ImportError: class BaseQueryTxMode(IToProto): + """Abstract class for Query Transaction Modes.""" + @property @abc.abstractmethod def name(self) -> str: @@ -17,6 +19,11 @@ class BaseQueryTxMode(IToProto): class QuerySnapshotReadOnly(BaseQueryTxMode): + """All the read operations within a transaction access the database snapshot. + All the data reads are consistent. The snapshot is taken when the transaction begins, + meaning the transaction sees all changes committed before it began. + """ + def __init__(self): self._name = "snapshot_read_only" @@ -29,6 +36,10 @@ class QuerySnapshotReadOnly(BaseQueryTxMode): class QuerySerializableReadWrite(BaseQueryTxMode): + """This mode guarantees that the result of successful parallel transactions is equivalent + to their serial execution, and there are no read anomalies for successful transactions. + """ + def __init__(self): self._name = "serializable_read_write" @@ -41,6 +52,15 @@ class QuerySerializableReadWrite(BaseQueryTxMode): class QueryOnlineReadOnly(BaseQueryTxMode): + """Each read operation in the transaction is reading the data that is most recent at execution time. + The consistency of retrieved data depends on the allow_inconsistent_reads setting: + * false (consistent reads): Each individual read operation returns consistent data, + but no consistency is guaranteed between reads. + Reading the same table range twice may return different results. + * true (inconsistent reads): Even the data fetched by a particular + read operation may contain inconsistent results. + """ + def __init__(self, allow_inconsistent_reads: bool = False): self.allow_inconsistent_reads = allow_inconsistent_reads self._name = "online_read_only" @@ -54,6 +74,11 @@ class QueryOnlineReadOnly(BaseQueryTxMode): class QueryStaleReadOnly(BaseQueryTxMode): + """Read operations within a transaction may return results that are slightly out-of-date + (lagging by fractions of a second). Each individual read returns consistent data, + but no consistency between different reads is guaranteed. + """ + def __init__(self): self._name = "stale_read_only" diff --git a/contrib/python/ydb/py3/ydb/aio/__init__.py b/contrib/python/ydb/py3/ydb/aio/__init__.py index 0e7d4e747a..1c9c887c22 100644 --- a/contrib/python/ydb/py3/ydb/aio/__init__.py +++ b/contrib/python/ydb/py3/ydb/aio/__init__.py @@ -1,3 +1,3 @@ from .driver import Driver # noqa from .table import SessionPool, retry_operation # noqa -from .query import QuerySessionPoolAsync, QuerySessionAsync # noqa +from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa diff --git a/contrib/python/ydb/py3/ydb/aio/query/__init__.py b/contrib/python/ydb/py3/ydb/aio/query/__init__.py index 829d7b54cf..8e7dd4fdff 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/__init__.py +++ b/contrib/python/ydb/py3/ydb/aio/query/__init__.py @@ -1,7 +1,9 @@ __all__ = [ - "QuerySessionPoolAsync", - "QuerySessionAsync", + "QuerySessionPool", + "QuerySession", + "QueryTxContext", ] -from .pool import QuerySessionPoolAsync -from .session import QuerySessionAsync +from .pool import QuerySessionPool +from .session import QuerySession +from .transaction import QueryTxContext diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index f91f7465e4..e8d53438fc 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import ( Callable, @@ -6,7 +7,7 @@ from typing import ( ) from .session import ( - QuerySessionAsync, + QuerySession, ) from ...retries import ( RetrySettings, @@ -18,20 +19,85 @@ from ..._grpc.grpcwrapper import common_utils logger = logging.getLogger(__name__) -class QuerySessionPoolAsync: - """QuerySessionPoolAsync is an object to simplify operations with sessions of Query Service.""" +class QuerySessionPool: + """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" - def __init__(self, driver: common_utils.SupportedDriverType): + def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100): """ :param driver: A driver instance + :param size: Size of session pool """ - logger.warning("QuerySessionPoolAsync is an experimental API, which could be changed.") + logger.warning("QuerySessionPool is an experimental API, which could be changed.") self._driver = driver + self._size = size + self._should_stop = asyncio.Event() + self._queue = asyncio.Queue() + self._current_size = 0 + self._waiters = 0 + self._loop = asyncio.get_running_loop() + + async def _create_new_session(self): + session = QuerySession(self._driver) + await session.create() + logger.debug(f"New session was created for pool. Session id: {session._state.session_id}") + return session + + async def acquire(self) -> QuerySession: + """WARNING: This API is experimental and could be changed. + + Acquire a session from Session Pool. + + :return A QuerySession object. + """ + + if self._should_stop.is_set(): + logger.error("An attempt to take session from closed session pool.") + raise RuntimeError("An attempt to take session from closed session pool.") + + session = None + try: + session = self._queue.get_nowait() + except asyncio.QueueEmpty: + pass + + if session is None and self._current_size == self._size: + queue_get = asyncio.ensure_future(self._queue.get()) + task_stop = asyncio.ensure_future(asyncio.ensure_future(self._should_stop.wait())) + done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED) + if task_stop in done: + queue_get.cancel() + raise RuntimeError("An attempt to take session from closed session pool.") + + task_stop.cancel() + session = queue_get.result() + + if session is not None: + if session._state.attached: + logger.debug(f"Acquired active session from queue: {session._state.session_id}") + return session + else: + self._current_size -= 1 + logger.debug(f"Acquired dead session from queue: {session._state.session_id}") + + logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.") + session = await self._create_new_session() + self._current_size += 1 + return session + + async def release(self, session: QuerySession) -> None: + """WARNING: This API is experimental and could be changed. + + Release a session back to Session Pool. + """ + + self._queue.put_nowait(session) + logger.debug("Session returned to queue: %s", session._state.session_id) def checkout(self) -> "SimpleQuerySessionCheckoutAsync": """WARNING: This API is experimental and could be changed. - Return a Session context manager, that opens session on enter and closes session on exit. + + Return a Session context manager, that acquires session on enter and releases session on exit. """ return SimpleQuerySessionCheckoutAsync(self) @@ -40,6 +106,7 @@ class QuerySessionPoolAsync: self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs ): """WARNING: This API is experimental and could be changed. + Special interface to execute a bunch of commands with session in a safe, retriable way. :param callee: A function, that works with session. @@ -65,6 +132,7 @@ class QuerySessionPoolAsync: **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. + Special interface to execute a one-shot queries in a safe, retriable way. Note: this method loads all data from stream before return, do not use this method with huge read queries. @@ -85,8 +153,20 @@ class QuerySessionPoolAsync: return await retry_operation_async(wrapped_callee, retry_settings) - async def stop(self, timeout=None): - pass # TODO: implement + async def stop(self): + self._should_stop.set() + + tasks = [] + while True: + try: + session = self._queue.get_nowait() + tasks.append(session.delete()) + except asyncio.QueueEmpty: + break + + await asyncio.gather(*tasks) + + logger.debug("All session were deleted.") async def __aenter__(self): return self @@ -94,15 +174,21 @@ class QuerySessionPoolAsync: async def __aexit__(self, exc_type, exc_val, exc_tb): await self.stop() + def __del__(self): + if self._should_stop.is_set() or self._loop.is_closed(): + return + + self._loop.call_soon(self.stop) + class SimpleQuerySessionCheckoutAsync: - def __init__(self, pool: QuerySessionPoolAsync): + def __init__(self, pool: QuerySessionPool): self._pool = pool - self._session = QuerySessionAsync(pool._driver) + self._session = None - async def __aenter__(self) -> QuerySessionAsync: - await self._session.create() + async def __aenter__(self) -> QuerySession: + self._session = await self._pool.acquire() return self._session async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._session.delete() + await self._pool.release(self._session) diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py index 627a41d895..4c1c1a10fc 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/session.py +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -5,9 +5,10 @@ from typing import ( ) from .base import AsyncResponseContextIterator -from .transaction import QueryTxContextAsync +from .transaction import QueryTxContext from .. import _utilities from ... import issues +from ...settings import BaseRequestSettings from ..._grpc.grpcwrapper import common_utils from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public @@ -18,7 +19,7 @@ from ...query.session import ( ) -class QuerySessionAsync(BaseQuerySession): +class QuerySession(BaseQuerySession): """Session object for Query Service. It is not recommended to control session's lifecycle manually - use a QuerySessionPool is always a better choise. """ @@ -32,7 +33,7 @@ class QuerySessionAsync(BaseQuerySession): settings: Optional[base.QueryClientSettings] = None, loop: asyncio.AbstractEventLoop = None, ): - super(QuerySessionAsync, self).__init__(driver, settings) + super(QuerySession, self).__init__(driver, settings) self._loop = loop if loop is not None else asyncio.get_running_loop() async def _attach(self) -> None: @@ -62,7 +63,7 @@ class QuerySessionAsync(BaseQuerySession): self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED) - async def delete(self) -> None: + async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Deletes a Session of Query Service on server side and releases resources. @@ -73,30 +74,30 @@ class QuerySessionAsync(BaseQuerySession): return self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) - await self._delete_call() + await self._delete_call(settings=settings) self._stream.cancel() - async def create(self) -> "QuerySessionAsync": + async def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession": """WARNING: This API is experimental and could be changed. Creates a Session of Query Service on server side and attaches it. - :return: QuerySessionSync object. + :return: QuerySession object. """ if self._state._already_in(QuerySessionStateEnum.CREATED): return self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) - await self._create_call() + await self._create_call(settings=settings) await self._attach() return self - def transaction(self, tx_mode=None) -> QueryTxContextAsync: + def transaction(self, tx_mode=None) -> QueryTxContext: self._state._check_session_ready_to_use() tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() - return QueryTxContextAsync( + return QueryTxContext( self._driver, self._state, self, @@ -110,10 +111,12 @@ class QuerySessionAsync(BaseQuerySession): syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, + settings: Optional[BaseRequestSettings] = None, ) -> AsyncResponseContextIterator: """WARNING: This API is experimental and could be changed. Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; @@ -132,6 +135,7 @@ class QuerySessionAsync(BaseQuerySession): exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) return AsyncResponseContextIterator( @@ -139,6 +143,7 @@ class QuerySessionAsync(BaseQuerySession): lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, + session_state=self._state, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py index 429ba125c8..b115a4b48b 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -15,8 +15,8 @@ from ...query.transaction import ( logger = logging.getLogger(__name__) -class QueryTxContextAsync(BaseQueryTxContext): - async def __aenter__(self) -> "QueryTxContextAsync": +class QueryTxContext(BaseQueryTxContext): + async def __aenter__(self) -> "QueryTxContext": """ Enters a context manager and returns a transaction @@ -47,7 +47,7 @@ class QueryTxContextAsync(BaseQueryTxContext): pass self._prev_stream = None - async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextAsync": + async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction @@ -114,6 +114,7 @@ class QueryTxContextAsync(BaseQueryTxContext): """WARNING: This API is experimental and could be changed. Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. @@ -146,6 +147,7 @@ class QueryTxContextAsync(BaseQueryTxContext): lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, + session_state=self._session_state, tx=self, commit_tx=commit_tx, settings=self.session._settings, diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py index 40e512cd6b..0f8187892f 100644 --- a/contrib/python/ydb/py3/ydb/query/__init__.py +++ b/contrib/python/ydb/py3/ydb/query/__init__.py @@ -1,11 +1,13 @@ __all__ = [ + "BaseQueryTxMode", "QueryOnlineReadOnly", "QuerySerializableReadWrite", "QuerySnapshotReadOnly", "QueryStaleReadOnly", "QuerySessionPool", - "QueryClientSync", - "QuerySessionSync", + "QueryClientSettings", + "QuerySession", + "QueryTxContext", ] import logging @@ -14,10 +16,12 @@ from .base import ( QueryClientSettings, ) -from .session import QuerySessionSync +from .session import QuerySession +from .transaction import QueryTxContext from .._grpc.grpcwrapper import common_utils from .._grpc.grpcwrapper.ydb_query_public_types import ( + BaseQueryTxMode, QueryOnlineReadOnly, QuerySerializableReadWrite, QuerySnapshotReadOnly, @@ -35,5 +39,5 @@ class QueryClientSync: self._driver = driver self._settings = query_client_settings - def session(self) -> QuerySessionSync: - return QuerySessionSync(self._driver, self._settings) + def session(self) -> QuerySession: + return QuerySession(self._driver, self._settings) diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index 55087d0c4e..9372cbcf54 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -165,28 +165,31 @@ def create_execute_query_request( ) +def bad_session_handler(func): + @functools.wraps(func) + def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs): + try: + return func(rpc_state, response_pb, session_state, *args, **kwargs) + except issues.BadSession: + session_state.reset() + raise + + return decorator + + +@bad_session_handler def wrap_execute_query_response( rpc_state: RpcState, response_pb: _apis.ydb_query.ExecuteQueryResponsePart, + session_state: IQuerySessionState, tx: Optional["BaseQueryTxContext"] = None, commit_tx: Optional[bool] = False, settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: issues._process_response(response_pb) - if tx and response_pb.tx_meta and not tx.tx_id: - tx._move_to_beginned(response_pb.tx_meta.id) if tx and commit_tx: tx._move_to_commited() - return convert.ResultSet.from_message(response_pb.result_set, settings) - - -def bad_session_handler(func): - @functools.wraps(func) - def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs): - try: - return func(rpc_state, response_pb, session_state, *args, **kwargs) - except issues.BadSession: - session_state.reset() - raise + elif tx and response_pb.tx_meta and not tx.tx_id: + tx._move_to_beginned(response_pb.tx_meta.id) - return decorator + return convert.ResultSet.from_message(response_pb.result_set, settings) diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index afe39f0623..839d8688ca 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -4,15 +4,20 @@ from typing import ( Optional, List, ) +import time +import threading +import queue from .session import ( - QuerySessionSync, + QuerySession, ) from ..retries import ( RetrySettings, retry_operation_sync, ) +from .. import issues from .. import convert +from ..settings import BaseRequestSettings from .._grpc.grpcwrapper import common_utils @@ -22,23 +27,102 @@ logger = logging.getLogger(__name__) class QuerySessionPool: """QuerySessionPool is an object to simplify operations with sessions of Query Service.""" - def __init__(self, driver: common_utils.SupportedDriverType): + def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100): """ - :param driver: A driver instance + :param driver: A driver instance. + :param size: Max size of Session Pool. """ logger.warning("QuerySessionPool is an experimental API, which could be changed.") self._driver = driver + self._queue = queue.Queue() + self._current_size = 0 + self._size = size + self._should_stop = threading.Event() + self._lock = threading.RLock() + + def _create_new_session(self, timeout: Optional[float]): + session = QuerySession(self._driver) + session.create(settings=BaseRequestSettings().with_timeout(timeout)) + logger.debug(f"New session was created for pool. Session id: {session._state.session_id}") + return session + + def acquire(self, timeout: Optional[float] = None) -> QuerySession: + """WARNING: This API is experimental and could be changed. + + Acquire a session from Session Pool. + + :param timeout: A timeout to wait in seconds. + + :return A QuerySession object. + """ + + start = time.monotonic() + + lock_acquire_timeout = timeout if timeout is not None else -1 + acquired = self._lock.acquire(timeout=lock_acquire_timeout) + try: + if self._should_stop.is_set(): + logger.error("An attempt to take session from closed session pool.") + raise RuntimeError("An attempt to take session from closed session pool.") + + session = None + try: + session = self._queue.get_nowait() + except queue.Empty: + pass + + finish = time.monotonic() + timeout = timeout - (finish - start) if timeout is not None else None + + start = time.monotonic() + if session is None and self._current_size == self._size: + try: + session = self._queue.get(block=True, timeout=timeout) + except queue.Empty: + raise issues.SessionPoolEmpty("Timeout on acquire session") + + if session is not None: + if session._state.attached: + logger.debug(f"Acquired active session from queue: {session._state.session_id}") + return session + else: + self._current_size -= 1 + logger.debug(f"Acquired dead session from queue: {session._state.session_id}") + + logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.") + finish = time.monotonic() + time_left = timeout - (finish - start) if timeout is not None else None + session = self._create_new_session(time_left) + + self._current_size += 1 + return session + finally: + if acquired: + self._lock.release() + + def release(self, session: QuerySession) -> None: + """WARNING: This API is experimental and could be changed. + + Release a session back to Session Pool. + """ + + self._queue.put_nowait(session) + logger.debug("Session returned to queue: %s", session._state.session_id) - def checkout(self) -> "SimpleQuerySessionCheckout": + def checkout(self, timeout: Optional[float] = None) -> "SimpleQuerySessionCheckout": """WARNING: This API is experimental and could be changed. - Return a Session context manager, that opens session on enter and closes session on exit. + + Return a Session context manager, that acquires session on enter and releases session on exit. + + :param timeout: A timeout to wait in seconds. """ - return SimpleQuerySessionCheckout(self) + return SimpleQuerySessionCheckout(self, timeout) def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs): """WARNING: This API is experimental and could be changed. + Special interface to execute a bunch of commands with session in a safe, retriable way. :param callee: A function, that works with session. @@ -50,7 +134,7 @@ class QuerySessionPool: retry_settings = RetrySettings() if retry_settings is None else retry_settings def wrapped_callee(): - with self.checkout() as session: + with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: return callee(session, *args, **kwargs) return retry_operation_sync(wrapped_callee, retry_settings) @@ -64,6 +148,7 @@ class QuerySessionPool: **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. + Special interface to execute a one-shot queries in a safe, retriable way. Note: this method loads all data from stream before return, do not use this method with huge read queries. @@ -78,14 +163,28 @@ class QuerySessionPool: retry_settings = RetrySettings() if retry_settings is None else retry_settings def wrapped_callee(): - with self.checkout() as session: + with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: it = session.execute(query, parameters, *args, **kwargs) return [result_set for result_set in it] return retry_operation_sync(wrapped_callee, retry_settings) def stop(self, timeout=None): - pass # TODO: implement + acquire_timeout = timeout if timeout is not None else -1 + acquired = self._lock.acquire(timeout=acquire_timeout) + try: + self._should_stop.set() + while True: + try: + session = self._queue.get_nowait() + session.delete() + except queue.Empty: + break + + logger.debug("All session were deleted.") + finally: + if acquired: + self._lock.release() def __enter__(self): return self @@ -93,15 +192,19 @@ class QuerySessionPool: def __exit__(self, exc_type, exc_val, exc_tb): self.stop() + def __del__(self): + self.stop() + class SimpleQuerySessionCheckout: - def __init__(self, pool: QuerySessionPool): + def __init__(self, pool: QuerySessionPool, timeout: Optional[float]): self._pool = pool - self._session = QuerySessionSync(pool._driver) + self._timeout = timeout + self._session = None - def __enter__(self) -> QuerySessionSync: - self._session.create() + def __enter__(self) -> QuerySession: + self._session = self._pool.acquire(self._timeout) return self._session def __exit__(self, exc_type, exc_val, exc_tb): - self._session.delete() + self._pool.release(self._session) diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index 4b051dc16f..5b4db26c92 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -10,12 +10,13 @@ from typing import ( from . import base from .. import _apis, issues, _utilities +from ..settings import BaseRequestSettings from ..connection import _RpcState as RpcState from .._grpc.grpcwrapper import common_utils from .._grpc.grpcwrapper import ydb_query as _ydb_query from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public -from .transaction import QueryTxContextSync +from .transaction import QueryTxContext logger = logging.getLogger(__name__) @@ -136,29 +137,32 @@ class BaseQuerySession: self._settings = settings if settings is not None else base.QueryClientSettings() self._state = QuerySessionState(settings) - def _create_call(self) -> "BaseQuerySession": + def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession": return self._driver( _apis.ydb_query.CreateSessionRequest(), _apis.QueryService.Stub, _apis.QueryService.CreateSession, wrap_result=wrapper_create_session, wrap_args=(self._state, self), + settings=settings, ) - def _delete_call(self) -> "BaseQuerySession": + def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession": return self._driver( _apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id), _apis.QueryService.Stub, _apis.QueryService.DeleteSession, wrap_result=wrapper_delete_session, wrap_args=(self._state, self), + settings=settings, ) - def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]: + def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]: return self._driver( _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id), _apis.QueryService.Stub, _apis.QueryService.AttachSession, + settings=settings, ) def _execute_call( @@ -169,6 +173,7 @@ class BaseQuerySession: exec_mode: base.QueryExecMode = None, parameters: dict = None, concurrent_result_sets: bool = False, + settings: Optional[BaseRequestSettings] = None, ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: request = base.create_execute_query_request( query=query, @@ -186,18 +191,19 @@ class BaseQuerySession: request.to_proto(), _apis.QueryService.Stub, _apis.QueryService.ExecuteQuery, + settings=settings, ) -class QuerySessionSync(BaseQuerySession): +class QuerySession(BaseQuerySession): """Session object for Query Service. It is not recommended to control session's lifecycle manually - use a QuerySessionPool is always a better choise. """ _stream = None - def _attach(self) -> None: - self._stream = self._attach_call() + def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None: + self._stream = self._attach_call(settings=settings) status_stream = _utilities.SyncResponseIterator( self._stream, lambda response: common_utils.ServerStatus.from_proto(response), @@ -228,7 +234,7 @@ class QuerySessionSync(BaseQuerySession): self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED) - def delete(self) -> None: + def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Deletes a Session of Query Service on server side and releases resources. @@ -239,29 +245,31 @@ class QuerySessionSync(BaseQuerySession): return self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) - self._delete_call() + self._delete_call(settings=settings) self._stream.cancel() - def create(self) -> "QuerySessionSync": + def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession": """WARNING: This API is experimental and could be changed. Creates a Session of Query Service on server side and attaches it. - :return: QuerySessionSync object. + :return: QuerySession object. """ if self._state._already_in(QuerySessionStateEnum.CREATED): return self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) - self._create_call() + + self._create_call(settings=settings) self._attach() return self - def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContextSync: + def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext: """WARNING: This API is experimental and could be changed. Creates a transaction context manager with specified transaction mode. + :param tx_mode: Transaction mode, which is a one from the following choises: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); @@ -275,7 +283,7 @@ class QuerySessionSync(BaseQuerySession): tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() - return QueryTxContextSync( + return QueryTxContext( self._driver, self._state, self, @@ -289,10 +297,12 @@ class QuerySessionSync(BaseQuerySession): syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, + settings: Optional[BaseRequestSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; @@ -311,6 +321,7 @@ class QuerySessionSync(BaseQuerySession): exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) return base.SyncResponseContextIterator( @@ -318,6 +329,7 @@ class QuerySessionSync(BaseQuerySession): lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, + session_state=self._state, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index be7396b1a5..21ba02798b 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -294,7 +294,7 @@ class BaseQueryTxContext: self._tx_state._change_state(QueryTxStateEnum.COMMITTED) -class QueryTxContextSync(BaseQueryTxContext): +class QueryTxContext(BaseQueryTxContext): def __enter__(self) -> "BaseQueryTxContext": """ Enters a context manager and returns a transaction @@ -326,7 +326,7 @@ class QueryTxContextSync(BaseQueryTxContext): pass self._prev_stream = None - def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextSync": + def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction @@ -394,6 +394,7 @@ class QueryTxContextSync(BaseQueryTxContext): """WARNING: This API is experimental and could be changed. Sends a query to Query Service + :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. @@ -427,6 +428,7 @@ class QueryTxContextSync(BaseQueryTxContext): lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, + session_state=self._session_state, tx=self, commit_tx=commit_tx, settings=self.session._settings, diff --git a/contrib/python/ydb/py3/ydb/settings.py b/contrib/python/ydb/py3/ydb/settings.py index 6739a46fab..019b75a8ec 100644 --- a/contrib/python/ydb/py3/ydb/settings.py +++ b/contrib/python/ydb/py3/ydb/settings.py @@ -39,7 +39,7 @@ class BaseRequestSettings(object): .with_need_rpc_auth(self.need_rpc_auth) ) - def with_compression(self, compression): + def with_compression(self, compression) -> "BaseRequestSettings": """ Enables compression for the specific RPC :param compression: An RPCCompression enum value. @@ -48,11 +48,11 @@ class BaseRequestSettings(object): self.compression = compression return self - def with_need_rpc_auth(self, need_rpc_auth): + def with_need_rpc_auth(self, need_rpc_auth) -> "BaseRequestSettings": self.need_rpc_auth = need_rpc_auth return self - def with_header(self, key, value): + def with_header(self, key, value) -> "BaseRequestSettings": """ Adds a key-value pair to the request headers. :param key: A string with a header key. @@ -62,7 +62,7 @@ class BaseRequestSettings(object): self.headers.append((key, value)) return self - def with_trace_id(self, trace_id): + def with_trace_id(self, trace_id) -> "BaseRequestSettings": """ Includes trace id for RPC headers :param trace_id: A trace id string @@ -71,7 +71,7 @@ class BaseRequestSettings(object): self.trace_id = trace_id return self - def with_request_type(self, request_type): + def with_request_type(self, request_type) -> "BaseRequestSettings": """ Includes request type for RPC headers :param request_type: A request type string @@ -80,7 +80,7 @@ class BaseRequestSettings(object): self.request_type = request_type return self - def with_operation_timeout(self, timeout): + def with_operation_timeout(self, timeout) -> "BaseRequestSettings": """ Indicates that client is no longer interested in the result of operation after the specified duration starting from the time operation arrives at the server. @@ -89,12 +89,12 @@ class BaseRequestSettings(object): Timeout of operation does not tell anything about its result, it might be completed successfully or cancelled on server. :param timeout: - :return: + :return: The self instance """ self.operation_timeout = timeout return self - def with_cancel_after(self, timeout): + def with_cancel_after(self, timeout) -> "BaseRequestSettings": """ Server will try to cancel the operation after the specified duration starting from the time the operation arrives at server. @@ -102,12 +102,12 @@ class BaseRequestSettings(object): sent back to client if it was waiting for the operation result. In case when cancellation isn't possible, no action will be performed. :param timeout: - :return: + :return: The self instance """ self.cancel_after = timeout return self - def with_timeout(self, timeout): + def with_timeout(self, timeout) -> "BaseRequestSettings": """ Client-side timeout to complete request. Since YDB doesn't support request cancellation at this moment, this feature should be diff --git a/contrib/python/ydb/py3/ydb/table.py b/contrib/python/ydb/py3/ydb/table.py index ac9f93042c..cfcffb17af 100644 --- a/contrib/python/ydb/py3/ydb/table.py +++ b/contrib/python/ydb/py3/ydb/table.py @@ -297,6 +297,10 @@ class TableIndex(object): self._pb.global_index.SetInParent() return self + def with_global_async_index(self): + self._pb.global_async_index.SetInParent() + return self + def with_index_columns(self, *columns): for column in columns: self._pb.index_columns.append(column) diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 96a1189198..b0ef9f368d 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.16.1" +VERSION = "3.17.1" |