summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-07-02 12:53:14 +0300
committerrobot-piglet <[email protected]>2025-07-02 13:02:06 +0300
commitdbe1b39bd441dc9ae17f4d4879ce1b3f36000157 (patch)
tree2b2c35e345096db5e0c081359e4e035cc7f1a3dd /contrib/python
parent7254af9393639e2abb0986cc9d84c4cfd37ae1f1 (diff)
Intermediate changes
commit_hash:c14b9dac6c17e532eb9edba5e3247d73f6994c6e
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA2
-rw-r--r--contrib/python/ydb/py3/ya.make2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py55
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py2
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py6
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py85
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py19
-rw-r--r--contrib/python/ydb/py3/ydb/connection.py22
-rw-r--r--contrib/python/ydb/py3/ydb/issues.py8
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py63
-rw-r--r--contrib/python/ydb/py3/ydb/query/pool.py70
-rw-r--r--contrib/python/ydb/py3/ydb/topic.py35
-rw-r--r--contrib/python/ydb/py3/ydb/types.py6
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
14 files changed, 321 insertions, 56 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 065349ff4c5..b93ac851639 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.21.4
+Version: 3.21.6
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index ec6ff9104d0..7e224e5c310 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.21.4)
+VERSION(3.21.6)
LICENSE(Apache-2.0)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index 24e8fa9ec0a..7baadacb3e0 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -99,7 +99,7 @@ class PublicAsyncIOReader:
def __del__(self):
if not self._closed:
try:
- logger.warning("Topic reader was not closed properly. Consider using method close().")
+ logger.debug("Topic reader was not closed properly. Consider using method close().")
task = self._loop.create_task(self.close(flush=False))
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
except BaseException:
@@ -121,6 +121,7 @@ class PublicAsyncIOReader:
use asyncio.wait_for for wait with timeout.
"""
+ logger.debug("receive_batch max_messages=%s", max_messages)
await self._reconnector.wait_message()
return self._reconnector.receive_batch_nowait(
max_messages=max_messages,
@@ -137,6 +138,7 @@ class PublicAsyncIOReader:
use asyncio.wait_for for wait with timeout.
"""
+ logger.debug("receive_batch_with_tx tx=%s max_messages=%s", tx, max_messages)
await self._reconnector.wait_message()
return self._reconnector.receive_batch_with_tx_nowait(
tx=tx,
@@ -149,6 +151,7 @@ class PublicAsyncIOReader:
use asyncio.wait_for for wait with timeout.
"""
+ logger.debug("receive_message")
await self._reconnector.wait_message()
return self._reconnector.receive_message_nowait()
@@ -159,6 +162,7 @@ class PublicAsyncIOReader:
For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again).
"""
+ logger.debug("commit message or batch")
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
@@ -177,6 +181,7 @@ class PublicAsyncIOReader:
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
to this or other reader).
"""
+ logger.debug("commit_with_ack message or batch")
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
@@ -187,8 +192,10 @@ class PublicAsyncIOReader:
if self._closed:
raise TopicReaderClosedError()
+ logger.debug("Close topic reader")
self._closed = True
await self._reconnector.close(flush)
+ logger.debug("Topic reader was closed")
@property
def read_session_id(self) -> Optional[str]:
@@ -214,11 +221,12 @@ class ReaderReconnector:
settings: topic_reader.PublicReaderSettings,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
- self._id = self._static_reader_reconnector_counter.inc_and_get()
+ self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get()
self._settings = settings
self._driver = driver
self._loop = loop if loop is not None else asyncio.get_running_loop()
self._background_tasks = set()
+ logger.debug("init reader reconnector id=%s", self._id)
self._state_changed = asyncio.Event()
self._stream_reader = None
@@ -231,13 +239,16 @@ class ReaderReconnector:
attempt = 0
while True:
try:
+ logger.debug("reader %s connect attempt %s", self._id, attempt)
self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings)
+ logger.debug("reader %s connected stream %s", self._id, self._stream_reader._id)
attempt = 0
self._state_changed.set()
await self._stream_reader.wait_error()
except BaseException as err:
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
if not retry_info.is_retriable:
+ logger.debug("reader %s stop connection loop due to %s", self._id, err)
self._set_first_error(err)
return
@@ -358,6 +369,7 @@ class ReaderReconnector:
return self._stream_reader.commit(batch)
async def close(self, flush: bool):
+ logger.debug("reader reconnector %s close", self._id)
if self._stream_reader:
await self._stream_reader.close(flush)
for task in self._background_tasks:
@@ -447,6 +459,8 @@ class ReaderStream:
self._settings = settings
+ logger.debug("created ReaderStream id=%s reconnector=%s", self._id, self._reader_reconnector_id)
+
@staticmethod
async def create(
reader_reconnector_id: int,
@@ -464,6 +478,7 @@ class ReaderStream:
get_token_function=creds.get_auth_token if creds else None,
)
await reader._start(stream, settings._init_message())
+ logger.debug("reader stream %s started session=%s", reader._id, reader._session_id)
return reader
async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest):
@@ -472,11 +487,13 @@ class ReaderStream:
self._started = True
self._stream = stream
+ logger.debug("reader stream %s send init request", self._id)
stream.write(StreamReadMessage.FromClient(client_message=init_message))
init_response = await stream.receive() # type: StreamReadMessage.FromServer
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
self._session_id = init_response.server_message.session_id
+ logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)
else:
raise TopicReaderError("Unexpected message after InitRequest: %s", init_response)
@@ -615,6 +632,7 @@ class ReaderStream:
async def _read_messages_loop(self):
try:
+ logger.debug("reader stream %s start read loop", self._id)
self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.ReadRequest(
@@ -628,6 +646,7 @@ class ReaderStream:
_process_response(message.server_status)
if isinstance(message.server_message, StreamReadMessage.ReadResponse):
+ logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size)
self._on_read_response(message.server_message)
elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
@@ -637,18 +656,33 @@ class ReaderStream:
message.server_message,
StreamReadMessage.StartPartitionSessionRequest,
):
+ logger.debug(
+ "reader stream %s start partition %s",
+ self._id,
+ message.server_message.partition_session.partition_session_id,
+ )
await self._on_start_partition_session(message.server_message)
elif isinstance(
message.server_message,
StreamReadMessage.StopPartitionSessionRequest,
):
+ logger.debug(
+ "reader stream %s stop partition %s",
+ self._id,
+ message.server_message.partition_session_id,
+ )
self._on_partition_session_stop(message.server_message)
elif isinstance(
message.server_message,
StreamReadMessage.EndPartitionSession,
):
+ logger.debug(
+ "reader stream %s end partition %s",
+ self._id,
+ message.server_message.partition_session_id,
+ )
self._on_end_partition_session(message.server_message)
elif isinstance(message.server_message, UpdateTokenResponse):
@@ -663,6 +697,7 @@ class ReaderStream:
self._state_changed.set()
except Exception as e:
+ logger.debug("reader stream %s error: %s", self._id, e)
self._set_first_error(e)
return
@@ -825,6 +860,7 @@ class ReaderStream:
async def _decode_batches_loop(self):
while True:
batch = await self._batches_to_decode.get()
+ logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages))
await self._decode_batch_inplace(batch)
self._add_batch_to_queue(batch)
self._state_changed.set()
@@ -833,9 +869,21 @@ class ReaderStream:
part_sess_id = batch._partition_session.id
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id]._extend(batch)
+ logger.debug(
+ "reader stream %s extend batch partition=%s size=%s",
+ self._id,
+ part_sess_id,
+ len(batch.messages),
+ )
return
self._message_batches[part_sess_id] = batch
+ logger.debug(
+ "reader stream %s new batch partition=%s size=%s",
+ self._id,
+ part_sess_id,
+ len(batch.messages),
+ )
async def _decode_batch_inplace(self, batch):
if batch._codec == Codec.CODEC_RAW:
@@ -882,6 +930,7 @@ class ReaderStream:
return
self._closed = True
+ logger.debug("reader stream %s close", self._id)
if flush:
await self.flush()
@@ -899,3 +948,5 @@ class ReaderStream:
if self._background_tasks:
await asyncio.wait(self._background_tasks)
+
+ logger.debug("reader stream %s was closed", self._id)
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
index f7590a2195c..3eea0390f7d 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py
@@ -64,7 +64,7 @@ class TopicReaderSync:
def __del__(self):
if not self._closed:
try:
- logger.warning("Topic reader was not closed properly. Consider using method close().")
+ logger.debug("Topic reader was not closed properly. Consider using method close().")
self.close(flush=False)
except BaseException:
logger.warning("Something went wrong during reader close in __del__")
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
index a3e407ed86d..16feded7771 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py
@@ -213,10 +213,10 @@ def messages_to_proto_requests(
tx_identity: Optional[TransactionIdentity],
) -> List[StreamWriteMessage.FromClient]:
- gropus = _slit_messages_for_send(messages)
+ groups = _split_messages_for_send(messages)
res = [] # type: List[StreamWriteMessage.FromClient]
- for group in gropus:
+ for group in groups:
req = StreamWriteMessage.FromClient(
StreamWriteMessage.WriteRequest(
messages=list(map(InternalMessage.to_message_data, group)),
@@ -254,7 +254,7 @@ _message_data_overhead = (
)
-def _slit_messages_for_send(
+def _split_messages_for_send(
messages: List[InternalMessage],
) -> List[List[InternalMessage]]:
codec_groups = [] # type: List[List[InternalMessage]]
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
index ec5b21661d4..eeecbfd2e81 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -26,6 +26,7 @@ from .. import (
_apis,
issues,
)
+from .._utilities import AtomicCounter
from .._errors import check_retriable_error
from .._topic_common import common as topic_common
from ..retries import RetrySettings
@@ -82,7 +83,7 @@ class WriterAsyncIO:
if self._closed or self._loop.is_closed():
return
try:
- logger.warning("Topic writer was not closed properly. Consider using method close().")
+ logger.debug("Topic writer was not closed properly. Consider using method close().")
task = self._loop.create_task(self.close(flush=False))
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close writer")
except BaseException:
@@ -92,9 +93,11 @@ class WriterAsyncIO:
if self._closed:
return
+ logger.debug("Close topic writer")
self._closed = True
await self._reconnector.close(flush)
+ logger.debug("Topic writer was closed")
async def write_with_ack(
self,
@@ -108,6 +111,10 @@ class WriterAsyncIO:
For wait with timeout use asyncio.wait_for.
"""
+ logger.debug(
+ "write_with_ack %s messages",
+ len(messages) if isinstance(messages, list) else 1,
+ )
futures = await self.write_with_ack_future(messages)
if not isinstance(futures, list):
futures = [futures]
@@ -129,6 +136,10 @@ class WriterAsyncIO:
For wait with timeout use asyncio.wait_for.
"""
+ logger.debug(
+ "write_with_ack_future %s messages",
+ len(messages) if isinstance(messages, list) else 1,
+ )
input_single_message = not isinstance(messages, list)
converted_messages = []
if isinstance(messages, list):
@@ -153,6 +164,10 @@ class WriterAsyncIO:
For wait with timeout use asyncio.wait_for.
"""
+ logger.debug(
+ "write %s messages",
+ len(messages) if isinstance(messages, list) else 1,
+ )
await self.write_with_ack_future(messages)
async def flush(self):
@@ -162,6 +177,7 @@ class WriterAsyncIO:
For wait with timeout use asyncio.wait_for.
"""
+ logger.debug("flush writer")
return await self._reconnector.flush()
async def wait_init(self) -> PublicWriterInitInfo:
@@ -170,6 +186,7 @@ class WriterAsyncIO:
For wait with timeout use asyncio.wait_for()
"""
+ logger.debug("wait writer init")
return await self._reconnector.wait_init()
@@ -225,6 +242,8 @@ class TxWriterAsyncIO(WriterAsyncIO):
class WriterAsyncIOReconnector:
+ _static_id_counter = AtomicCounter()
+
_closed: bool
_loop: asyncio.AbstractEventLoop
_credentials: Union[ydb.credentials.Credentials, None]
@@ -260,6 +279,7 @@ class WriterAsyncIOReconnector:
self, driver: SupportedDriverType, settings: WriterSettings, tx: Optional["BaseQueryTxContext"] = None
):
self._closed = False
+ self._id = WriterAsyncIOReconnector._static_id_counter.inc_and_get()
self._loop = asyncio.get_running_loop()
self._driver = driver
self._credentials = driver._credentials
@@ -307,12 +327,13 @@ class WriterAsyncIOReconnector:
]
self._state_changed = asyncio.Event()
+ logger.debug("init writer reconnector id=%s", self._id)
async def close(self, flush: bool):
if self._closed:
return
self._closed = True
- logger.debug("Close writer reconnector")
+ logger.debug("Close writer reconnector id=%s", self._id)
if flush:
await self.flush()
@@ -329,6 +350,8 @@ class WriterAsyncIOReconnector:
except TopicWriterStopped:
pass
+ logger.debug("Writer reconnector id=%s was closed", self._id)
+
async def wait_init(self) -> PublicWriterInitInfo:
while True:
if self._stop_reason.done():
@@ -418,6 +441,7 @@ class WriterAsyncIOReconnector:
# noinspection PyBroadException
stream_writer = None
try:
+ logger.debug("writer reconnector %s connect attempt %s", self._id, attempt)
tx_identity = None if self._tx is None else self._tx._tx_identity()
stream_writer = await WriterAsyncIOStream.create(
self._driver,
@@ -425,6 +449,11 @@ class WriterAsyncIOReconnector:
self._settings.update_token_interval,
tx_identity=tx_identity,
)
+ logger.debug(
+ "writer reconnector %s connected stream %s",
+ self._id,
+ stream_writer._id,
+ )
try:
if self._init_info is None:
self._last_known_seq_no = stream_writer.last_seqno
@@ -458,6 +487,11 @@ class WriterAsyncIOReconnector:
return
await asyncio.sleep(err_info.sleep_timeout_seconds)
+ logger.debug(
+ "writer reconnector %s retry in %s seconds",
+ self._id,
+ err_info.sleep_timeout_seconds,
+ )
except (asyncio.CancelledError, Exception) as err:
self._stop(err)
@@ -477,6 +511,12 @@ class WriterAsyncIOReconnector:
while not self._messages_for_encode.empty():
messages.extend(self._messages_for_encode.get_nowait())
+ logger.debug(
+ "writer reconnector %s encode %s messages",
+ self._id,
+ len(messages),
+ )
+
batch_codec = await self._codec_selector(messages)
await self._encode_data_inplace(batch_codec, messages)
self._add_messages_to_send_queue(messages)
@@ -582,6 +622,8 @@ class WriterAsyncIOReconnector:
while True:
resp = await writer.receive()
+ logger.debug("writer reconnector %s received %s acks", self._id, len(resp.acks))
+
for ack in resp.acks:
self._handle_receive_ack(ack)
@@ -604,20 +646,37 @@ class WriterAsyncIOReconnector:
else:
raise TopicWriterError("internal error - receive unexpected ack message.")
message_future.set_result(result)
+ logger.debug(
+ "writer reconnector %s ack seqno=%s result=%s",
+ self._id,
+ ack.seq_no,
+ type(result).__name__,
+ )
async def _send_loop(self, writer: "WriterAsyncIOStream"):
try:
+ logger.debug("writer reconnector %s send loop start", self._id)
messages = list(self._messages)
last_seq_no = 0
for m in messages:
writer.write([m])
+ logger.debug(
+ "writer reconnector %s sent buffered message seqno=%s",
+ self._id,
+ m.seq_no,
+ )
last_seq_no = m.seq_no
while True:
m = await self._new_messages.get() # type: InternalMessage
if m.seq_no > last_seq_no:
writer.write([m])
+ logger.debug(
+ "writer reconnector %s sent message seqno=%s",
+ self._id,
+ m.seq_no,
+ )
except asyncio.CancelledError:
# the loop task cancelled be parent code, for example for reconnection
# no need to stop all work.
@@ -639,7 +698,7 @@ class WriterAsyncIOReconnector:
f.set_exception(reason)
self._state_changed.set()
- logger.info("Stop topic writer: %s" % reason)
+ logger.info("Stop topic writer %s: %s" % (self._id, reason))
async def flush(self):
if not self._messages_future:
@@ -650,6 +709,8 @@ class WriterAsyncIOReconnector:
class WriterAsyncIOStream:
+ _static_id_counter = AtomicCounter()
+
# todo slots
_closed: bool
@@ -674,6 +735,7 @@ class WriterAsyncIOStream:
tx_identity: Optional[TransactionIdentity] = None,
):
self._closed = False
+ self._id = WriterAsyncIOStream._static_id_counter.inc_and_get()
self._update_token_interval = update_token_interval
self._get_token_function = get_token_function
@@ -686,12 +748,14 @@ class WriterAsyncIOStream:
if self._closed:
return
self._closed = True
+ logger.debug("writer stream %s close", self._id)
if self._update_token_task:
self._update_token_task.cancel()
await asyncio.wait([self._update_token_task])
self._stream.close()
+ logger.debug("writer stream %s was closed", self._id)
@staticmethod
async def create(
@@ -711,6 +775,11 @@ class WriterAsyncIOStream:
tx_identity=tx_identity,
)
await writer._start(stream, init_request)
+ logger.debug(
+ "writer stream %s started seqno=%s",
+ writer._id,
+ writer.last_seqno,
+ )
return writer
async def receive(self) -> StreamWriteMessage.WriteResponse:
@@ -727,6 +796,7 @@ class WriterAsyncIOStream:
raise Exception("Unknown message while read writer answers: %s" % item)
async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMessage.InitRequest):
+ logger.debug("writer stream %s send init request", self._id)
stream.write(StreamWriteMessage.FromClient(init_message))
resp = await stream.receive()
@@ -736,6 +806,11 @@ class WriterAsyncIOStream:
self.last_seqno = resp.last_seq_no
self.supported_codecs = [PublicCodec(codec) for codec in resp.supported_codecs]
+ logger.debug(
+ "writer stream %s init done last_seqno=%s",
+ self._id,
+ self.last_seqno,
+ )
self._stream = stream
@@ -755,6 +830,8 @@ class WriterAsyncIOStream:
if self._closed:
raise RuntimeError("Can not write on closed stream.")
+ logger.debug("writer stream %s send %s messages", self._id, len(messages))
+
for request in messages_to_proto_requests(messages, self._tx_identity):
self._stream.write(request)
@@ -764,6 +841,7 @@ class WriterAsyncIOStream:
token = self._get_token_function()
if asyncio.iscoroutine(token):
token = await token
+ logger.debug("writer stream %s update token", self._id)
await self._update_token(token=token)
async def _update_token(self, token: str):
@@ -771,5 +849,6 @@ class WriterAsyncIOStream:
try:
msg = StreamWriteMessage.FromClient(UpdateTokenRequest(token))
self._stream.write(msg)
+ logger.debug("writer stream %s token sent", self._id)
finally:
self._update_token_event.clear()
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py
index 954864c9682..7806d7faba8 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py
@@ -76,7 +76,7 @@ class WriterSync:
def __del__(self):
if not self._closed:
try:
- logger.warning("Topic writer was not closed properly. Consider using method close().")
+ logger.debug("Topic writer was not closed properly. Consider using method close().")
self.close(flush=False)
except BaseException:
logger.warning("Something went wrong during writer close in __del__")
@@ -85,6 +85,7 @@ class WriterSync:
if self._closed:
return
+ logger.debug("Close topic writer")
self._closed = True
self._caller.safe_call_with_result(self._async_writer.close(flush=flush), timeout)
@@ -101,16 +102,22 @@ class WriterSync:
def flush(self, *, timeout=None):
self._check_closed()
+ logger.debug("flush writer")
+
return self._caller.unsafe_call_with_result(self._async_writer.flush(), timeout)
def async_wait_init(self) -> Future[PublicWriterInitInfo]:
self._check_closed()
+ logger.debug("wait writer init")
+
return self._caller.unsafe_call_with_future(self._async_writer.wait_init())
def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo:
self._check_closed()
+ logger.debug("wait writer init")
+
return self._caller.unsafe_call_with_result(self._async_writer.wait_init(), timeout)
def write(
@@ -120,6 +127,11 @@ class WriterSync:
):
self._check_closed()
+ logger.debug(
+ "write %s messages",
+ len(messages) if isinstance(messages, list) else 1,
+ )
+
self._caller.safe_call_with_result(self._async_writer.write(messages), timeout)
def async_write_with_ack(
@@ -137,6 +149,11 @@ class WriterSync:
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
self._check_closed()
+ logger.debug(
+ "write_with_ack %s messages",
+ len(messages) if isinstance(messages, list) else 1,
+ )
+
return self._caller.unsafe_call_with_result(self._async_writer.write_with_ack(messages), timeout=timeout)
diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py
index 8e65cd3b833..d5b6ed50c69 100644
--- a/contrib/python/ydb/py3/ydb/connection.py
+++ b/contrib/python/ydb/py3/ydb/connection.py
@@ -26,6 +26,7 @@ YDB_TRACE_ID_HEADER = "x-ydb-trace-id"
YDB_REQUEST_TYPE_HEADER = "x-ydb-request-type"
_DEFAULT_MAX_GRPC_MESSAGE_SIZE = 64 * 10**6
+_DEFAULT_KEEPALIVE_TIMEOUT = 10000
def _message_to_string(message):
@@ -185,15 +186,18 @@ def _construct_channel_options(driver_config, endpoint_options=None):
getattr(driver_config, "grpc_lb_policy_name", "round_robin"),
),
]
- if driver_config.grpc_keep_alive_timeout is not None:
- _default_connect_options.extend(
- [
- ("grpc.keepalive_time_ms", driver_config.grpc_keep_alive_timeout >> 3),
- ("grpc.keepalive_timeout_ms", driver_config.grpc_keep_alive_timeout),
- ("grpc.http2.max_pings_without_data", 0),
- ("grpc.keepalive_permit_without_calls", 0),
- ]
- )
+ if driver_config.grpc_keep_alive_timeout is None:
+ driver_config.grpc_keep_alive_timeout = _DEFAULT_KEEPALIVE_TIMEOUT
+
+ _default_connect_options.extend(
+ [
+ ("grpc.keepalive_time_ms", driver_config.grpc_keep_alive_timeout >> 3),
+ ("grpc.keepalive_timeout_ms", driver_config.grpc_keep_alive_timeout),
+ ("grpc.http2.max_pings_without_data", 0),
+ ("grpc.keepalive_permit_without_calls", 0),
+ ]
+ )
+
if endpoint_options is not None:
if endpoint_options.ssl_target_name_override:
_default_connect_options.append(
diff --git a/contrib/python/ydb/py3/ydb/issues.py b/contrib/python/ydb/py3/ydb/issues.py
index 4e76f5ed2b0..1971870ca2b 100644
--- a/contrib/python/ydb/py3/ydb/issues.py
+++ b/contrib/python/ydb/py3/ydb/issues.py
@@ -50,6 +50,7 @@ class StatusCode(enum.IntEnum):
UNAUTHENTICATED = _CLIENT_STATUSES_FIRST + 30
SESSION_POOL_EMPTY = _CLIENT_STATUSES_FIRST + 40
+ SESSION_POOL_CLOSED = _CLIENT_STATUSES_FIRST + 50
# TODO: convert from proto IssueMessage
@@ -178,6 +179,13 @@ class SessionPoolEmpty(Error, queue.Empty):
status = StatusCode.SESSION_POOL_EMPTY
+class SessionPoolClosed(Error):
+ status = StatusCode.SESSION_POOL_CLOSED
+
+ def __init__(self):
+ super().__init__("Session pool is closed.")
+
+
class ClientInternalError(Error):
status = StatusCode.CLIENT_INTERNAL_ERROR
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
index 2c16716c513..4007e72dddc 100644
--- a/contrib/python/ydb/py3/ydb/query/base.py
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -137,40 +137,43 @@ def create_execute_query_request(
parameters: Optional[dict],
concurrent_result_sets: Optional[bool],
) -> ydb_query.ExecuteQueryRequest:
- syntax = QuerySyntax.YQL_V1 if not syntax else syntax
- exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
- stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode
+ try:
+ syntax = QuerySyntax.YQL_V1 if not syntax else syntax
+ exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
+ stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode
- tx_control = None
- if not tx_id and not tx_mode:
tx_control = None
- elif tx_id:
- tx_control = ydb_query.TransactionControl(
- tx_id=tx_id,
- commit_tx=commit_tx,
- begin_tx=None,
- )
- else:
- tx_control = ydb_query.TransactionControl(
- begin_tx=ydb_query.TransactionSettings(
- tx_mode=tx_mode,
+ if not tx_id and not tx_mode:
+ tx_control = None
+ elif tx_id:
+ tx_control = ydb_query.TransactionControl(
+ tx_id=tx_id,
+ commit_tx=commit_tx,
+ begin_tx=None,
+ )
+ else:
+ tx_control = ydb_query.TransactionControl(
+ begin_tx=ydb_query.TransactionSettings(
+ tx_mode=tx_mode,
+ ),
+ commit_tx=commit_tx,
+ tx_id=None,
+ )
+
+ return ydb_query.ExecuteQueryRequest(
+ session_id=session_id,
+ query_content=ydb_query.QueryContent.from_public(
+ query=query,
+ syntax=syntax,
),
- commit_tx=commit_tx,
- tx_id=None,
+ tx_control=tx_control,
+ exec_mode=exec_mode,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ stats_mode=stats_mode,
)
-
- return ydb_query.ExecuteQueryRequest(
- session_id=session_id,
- query_content=ydb_query.QueryContent.from_public(
- query=query,
- syntax=syntax,
- ),
- tx_control=tx_control,
- exec_mode=exec_mode,
- parameters=parameters,
- concurrent_result_sets=concurrent_result_sets,
- stats_mode=stats_mode,
- )
+ except BaseException as e:
+ raise issues.ClientInternalError("Unable to prepare execute request") from e
def bad_session_handler(func):
diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py
index 1cf95ac0d13..fc05950c9d6 100644
--- a/contrib/python/ydb/py3/ydb/query/pool.py
+++ b/contrib/python/ydb/py3/ydb/query/pool.py
@@ -1,4 +1,5 @@
import logging
+from concurrent import futures
from typing import (
Callable,
Optional,
@@ -36,14 +37,17 @@ class QuerySessionPool:
size: int = 100,
*,
query_client_settings: Optional[QueryClientSettings] = None,
+ workers_threads_count: int = 4,
):
"""
:param driver: A driver instance.
:param size: Max size of Session Pool.
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
+ :param workers_threads_count: A number of threads in executor used for *_async methods
"""
self._driver = driver
+ self._tp = futures.ThreadPoolExecutor(workers_threads_count)
self._queue = queue.Queue()
self._current_size = 0
self._size = size
@@ -72,7 +76,7 @@ class QuerySessionPool:
try:
if self._should_stop.is_set():
logger.error("An attempt to take session from closed session pool.")
- raise RuntimeError("An attempt to take session from closed session pool.")
+ raise issues.SessionPoolClosed()
session = None
try:
@@ -132,6 +136,9 @@ class QuerySessionPool:
:return: Result sets or exception in case of execution errors.
"""
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
@@ -140,6 +147,38 @@ class QuerySessionPool:
return retry_operation_sync(wrapped_callee, retry_settings)
+ def retry_tx_async(
+ self,
+ callee: Callable,
+ tx_mode: Optional[BaseQueryTxMode] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
+ ) -> futures.Future:
+ """Asynchronously execute a transaction in a retriable way."""
+
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
+ return self._tp.submit(
+ self.retry_tx_sync,
+ callee,
+ tx_mode,
+ retry_settings,
+ *args,
+ **kwargs,
+ )
+
+ def retry_operation_async(
+ self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
+ ) -> futures.Future:
+ """Asynchronously execute a retryable operation."""
+
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
+ return self._tp.submit(self.retry_operation_sync, callee, retry_settings, *args, **kwargs)
+
def retry_tx_sync(
self,
callee: Callable,
@@ -161,6 +200,9 @@ class QuerySessionPool:
:return: Result sets or exception in case of execution errors.
"""
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
retry_settings = RetrySettings() if retry_settings is None else retry_settings
@@ -194,6 +236,9 @@ class QuerySessionPool:
:return: Result sets or exception in case of execution errors.
"""
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
@@ -203,11 +248,34 @@ class QuerySessionPool:
return retry_operation_sync(wrapped_callee, retry_settings)
+ def execute_with_retries_async(
+ self,
+ query: str,
+ parameters: Optional[dict] = None,
+ retry_settings: Optional[RetrySettings] = None,
+ *args,
+ **kwargs,
+ ) -> futures.Future:
+ """Asynchronously execute a query with retries."""
+
+ if self._should_stop.is_set():
+ raise issues.SessionPoolClosed()
+
+ return self._tp.submit(
+ self.execute_with_retries,
+ query,
+ parameters,
+ retry_settings,
+ *args,
+ **kwargs,
+ )
+
def stop(self, timeout=None):
acquire_timeout = timeout if timeout is not None else -1
acquired = self._lock.acquire(timeout=acquire_timeout)
try:
self._should_stop.set()
+ self._tp.shutdown(wait=True)
while True:
try:
session = self._queue.get_nowait()
diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py
index aa6c7eb4f3b..5e86be68e17 100644
--- a/contrib/python/ydb/py3/ydb/topic.py
+++ b/contrib/python/ydb/py3/ydb/topic.py
@@ -122,7 +122,7 @@ class TopicClientAsyncIO:
def __del__(self):
if not self._closed:
try:
- logger.warning("Topic client was not closed properly. Consider using method close().")
+ logger.debug("Topic client was not closed properly. Consider using method close().")
self.close()
except BaseException:
logger.warning("Something went wrong during topic client close in __del__")
@@ -161,6 +161,7 @@ class TopicClientAsyncIO:
:param consumers: List of consumers for this topic
:param metering_mode: Metering mode for the topic in a serverless database
"""
+ logger.debug("Create topic request: path=%s", path)
args = locals().copy()
del args["self"]
req = _ydb_topic_public_types.CreateTopicRequestParams(**args)
@@ -210,6 +211,7 @@ class TopicClientAsyncIO:
:param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
Empty list mean disable codec compatibility checks for the topic.
"""
+ logger.debug("Alter topic request: path=%s", path)
args = locals().copy()
del args["self"]
req = _ydb_topic_public_types.AlterTopicRequestParams(**args)
@@ -222,6 +224,7 @@ class TopicClientAsyncIO:
)
async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
+ logger.debug("Describe topic request: path=%s", path)
args = locals().copy()
del args["self"]
req = _ydb_topic_public_types.DescribeTopicRequestParams(**args)
@@ -234,6 +237,7 @@ class TopicClientAsyncIO:
return res.to_public()
async def drop_topic(self, path: str):
+ logger.debug("Drop topic request: path=%s", path)
req = _ydb_topic_public_types.DropTopicRequestParams(path=path)
await self._driver(
req.to_proto(),
@@ -257,6 +261,8 @@ class TopicClientAsyncIO:
event_handler: Optional[TopicReaderEvents.EventHandler] = None,
) -> TopicReaderAsyncIO:
+ logger.debug("Create reader for topic=%s consumer=%s", topic, consumer)
+
if not decoder_executor:
decoder_executor = self._executor
@@ -301,6 +307,7 @@ class TopicClientAsyncIO:
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicWriterAsyncIO:
+ logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id)
args = locals().copy()
del args["self"]
@@ -329,6 +336,7 @@ class TopicClientAsyncIO:
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicTxWriterAsyncIO:
+ logger.debug("Create tx writer for topic=%s tx=%s", topic, tx)
args = locals().copy()
del args["self"]
del args["tx"]
@@ -343,6 +351,13 @@ class TopicClientAsyncIO:
async def commit_offset(
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
) -> None:
+ logger.debug(
+ "Commit offset: path=%s partition_id=%s offset=%s consumer=%s",
+ path,
+ partition_id,
+ offset,
+ consumer,
+ )
req = _ydb_topic.CommitOffsetRequest(
path=path,
consumer=consumer,
@@ -362,6 +377,7 @@ class TopicClientAsyncIO:
if self._closed:
return
+ logger.debug("Close topic client")
self._closed = True
self._executor.shutdown(wait=False)
@@ -433,6 +449,7 @@ class TopicClient:
:param consumers: List of consumers for this topic
:param metering_mode: Metering mode for the topic in a serverless database
"""
+ logger.debug("Create topic request: path=%s", path)
args = locals().copy()
del args["self"]
self._check_closed()
@@ -484,6 +501,7 @@ class TopicClient:
:param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
Empty list mean disable codec compatibility checks for the topic.
"""
+ logger.debug("Alter topic request: path=%s", path)
args = locals().copy()
del args["self"]
self._check_closed()
@@ -498,6 +516,7 @@ class TopicClient:
)
def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
+ logger.debug("Describe topic request: path=%s", path)
args = locals().copy()
del args["self"]
self._check_closed()
@@ -514,6 +533,8 @@ class TopicClient:
def drop_topic(self, path: str):
self._check_closed()
+ logger.debug("Drop topic request: path=%s", path)
+
req = _ydb_topic_public_types.DropTopicRequestParams(path=path)
self._driver(
req.to_proto(),
@@ -536,6 +557,7 @@ class TopicClient:
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
event_handler: Optional[TopicReaderEvents.EventHandler] = None,
) -> TopicReader:
+ logger.debug("Create reader for topic=%s consumer=%s", topic, consumer)
if not decoder_executor:
decoder_executor = self._executor
@@ -580,6 +602,7 @@ class TopicClient:
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicWriter:
+ logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id)
args = locals().copy()
del args["self"]
self._check_closed()
@@ -609,6 +632,7 @@ class TopicClient:
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicWriter:
+ logger.debug("Create tx writer for topic=%s tx=%s", topic, tx)
args = locals().copy()
del args["self"]
del args["tx"]
@@ -624,6 +648,13 @@ class TopicClient:
def commit_offset(
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
) -> None:
+ logger.debug(
+ "Commit offset: path=%s partition_id=%s offset=%s consumer=%s",
+ path,
+ partition_id,
+ offset,
+ consumer,
+ )
req = _ydb_topic.CommitOffsetRequest(
path=path,
consumer=consumer,
@@ -643,8 +674,10 @@ class TopicClient:
if self._closed:
return
+ logger.debug("Close topic client")
self._closed = True
self._executor.shutdown(wait=False)
+ logger.debug("Topic client was closed")
def _check_closed(self):
if not self._closed:
diff --git a/contrib/python/ydb/py3/ydb/types.py b/contrib/python/ydb/py3/ydb/types.py
index 47c9c48c2e2..5ef601a7ca2 100644
--- a/contrib/python/ydb/py3/ydb/types.py
+++ b/contrib/python/ydb/py3/ydb/types.py
@@ -32,8 +32,10 @@ def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientS
return x.uint32_value
-def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None:
- if isinstance(value, date):
+def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, datetime, int]) -> None:
+ if isinstance(value, datetime):
+ pb.uint32_value = (value.date() - _EPOCH.date()).days
+ elif isinstance(value, date):
pb.uint32_value = (value - _EPOCH.date()).days
else:
pb.uint32_value = value
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index f3776ea68db..9062621ad98 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.21.4"
+VERSION = "3.21.6"