diff options
author | robot-piglet <[email protected]> | 2025-07-02 12:53:14 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-07-02 13:02:06 +0300 |
commit | dbe1b39bd441dc9ae17f4d4879ce1b3f36000157 (patch) | |
tree | 2b2c35e345096db5e0c081359e4e035cc7f1a3dd /contrib/python | |
parent | 7254af9393639e2abb0986cc9d84c4cfd37ae1f1 (diff) |
Intermediate changes
commit_hash:c14b9dac6c17e532eb9edba5e3247d73f6994c6e
Diffstat (limited to 'contrib/python')
-rw-r--r-- | contrib/python/ydb/py3/.dist-info/METADATA | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ya.make | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py | 55 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py | 2 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py | 6 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py | 85 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py | 19 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/connection.py | 22 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/issues.py | 8 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/base.py | 63 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/query/pool.py | 70 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/topic.py | 35 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/types.py | 6 | ||||
-rw-r--r-- | contrib/python/ydb/py3/ydb/ydb_version.py | 2 |
14 files changed, 321 insertions, 56 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 065349ff4c5..b93ac851639 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.21.4 +Version: 3.21.6 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index ec6ff9104d0..7e224e5c310 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.21.4) +VERSION(3.21.6) LICENSE(Apache-2.0) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 24e8fa9ec0a..7baadacb3e0 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -99,7 +99,7 @@ class PublicAsyncIOReader: def __del__(self): if not self._closed: try: - logger.warning("Topic reader was not closed properly. Consider using method close().") + logger.debug("Topic reader was not closed properly. Consider using method close().") task = self._loop.create_task(self.close(flush=False)) topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader") except BaseException: @@ -121,6 +121,7 @@ class PublicAsyncIOReader: use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_batch max_messages=%s", max_messages) await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait( max_messages=max_messages, @@ -137,6 +138,7 @@ class PublicAsyncIOReader: use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_batch_with_tx tx=%s max_messages=%s", tx, max_messages) await self._reconnector.wait_message() return self._reconnector.receive_batch_with_tx_nowait( tx=tx, @@ -149,6 +151,7 @@ class PublicAsyncIOReader: use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_message") await self._reconnector.wait_message() return self._reconnector.receive_message_nowait() @@ -159,6 +162,7 @@ class PublicAsyncIOReader: For the method no way check the commit result (for example if lost connection - commits will not re-send and committed messages will receive again). """ + logger.debug("commit message or batch") if self._settings.consumer is None: raise issues.Error("Commit operations are not supported for topic reader without consumer.") @@ -177,6 +181,7 @@ class PublicAsyncIOReader: before receive commit ack. Message may be acked or not (if not - it will send in other read session, to this or other reader). """ + logger.debug("commit_with_ack message or batch") if self._settings.consumer is None: raise issues.Error("Commit operations are not supported for topic reader without consumer.") @@ -187,8 +192,10 @@ class PublicAsyncIOReader: if self._closed: raise TopicReaderClosedError() + logger.debug("Close topic reader") self._closed = True await self._reconnector.close(flush) + logger.debug("Topic reader was closed") @property def read_session_id(self) -> Optional[str]: @@ -214,11 +221,12 @@ class ReaderReconnector: settings: topic_reader.PublicReaderSettings, loop: Optional[asyncio.AbstractEventLoop] = None, ): - self._id = self._static_reader_reconnector_counter.inc_and_get() + self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get() self._settings = settings self._driver = driver self._loop = loop if loop is not None else asyncio.get_running_loop() self._background_tasks = set() + logger.debug("init reader reconnector id=%s", self._id) self._state_changed = asyncio.Event() self._stream_reader = None @@ -231,13 +239,16 @@ class ReaderReconnector: attempt = 0 while True: try: + logger.debug("reader %s connect attempt %s", self._id, attempt) self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings) + logger.debug("reader %s connected stream %s", self._id, self._stream_reader._id) attempt = 0 self._state_changed.set() await self._stream_reader.wait_error() except BaseException as err: retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt) if not retry_info.is_retriable: + logger.debug("reader %s stop connection loop due to %s", self._id, err) self._set_first_error(err) return @@ -358,6 +369,7 @@ class ReaderReconnector: return self._stream_reader.commit(batch) async def close(self, flush: bool): + logger.debug("reader reconnector %s close", self._id) if self._stream_reader: await self._stream_reader.close(flush) for task in self._background_tasks: @@ -447,6 +459,8 @@ class ReaderStream: self._settings = settings + logger.debug("created ReaderStream id=%s reconnector=%s", self._id, self._reader_reconnector_id) + @staticmethod async def create( reader_reconnector_id: int, @@ -464,6 +478,7 @@ class ReaderStream: get_token_function=creds.get_auth_token if creds else None, ) await reader._start(stream, settings._init_message()) + logger.debug("reader stream %s started session=%s", reader._id, reader._session_id) return reader async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest): @@ -472,11 +487,13 @@ class ReaderStream: self._started = True self._stream = stream + logger.debug("reader stream %s send init request", self._id) stream.write(StreamReadMessage.FromClient(client_message=init_message)) init_response = await stream.receive() # type: StreamReadMessage.FromServer if isinstance(init_response.server_message, StreamReadMessage.InitResponse): self._session_id = init_response.server_message.session_id + logger.debug("reader stream %s initialized session=%s", self._id, self._session_id) else: raise TopicReaderError("Unexpected message after InitRequest: %s", init_response) @@ -615,6 +632,7 @@ class ReaderStream: async def _read_messages_loop(self): try: + logger.debug("reader stream %s start read loop", self._id) self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.ReadRequest( @@ -628,6 +646,7 @@ class ReaderStream: _process_response(message.server_status) if isinstance(message.server_message, StreamReadMessage.ReadResponse): + logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size) self._on_read_response(message.server_message) elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): @@ -637,18 +656,33 @@ class ReaderStream: message.server_message, StreamReadMessage.StartPartitionSessionRequest, ): + logger.debug( + "reader stream %s start partition %s", + self._id, + message.server_message.partition_session.partition_session_id, + ) await self._on_start_partition_session(message.server_message) elif isinstance( message.server_message, StreamReadMessage.StopPartitionSessionRequest, ): + logger.debug( + "reader stream %s stop partition %s", + self._id, + message.server_message.partition_session_id, + ) self._on_partition_session_stop(message.server_message) elif isinstance( message.server_message, StreamReadMessage.EndPartitionSession, ): + logger.debug( + "reader stream %s end partition %s", + self._id, + message.server_message.partition_session_id, + ) self._on_end_partition_session(message.server_message) elif isinstance(message.server_message, UpdateTokenResponse): @@ -663,6 +697,7 @@ class ReaderStream: self._state_changed.set() except Exception as e: + logger.debug("reader stream %s error: %s", self._id, e) self._set_first_error(e) return @@ -825,6 +860,7 @@ class ReaderStream: async def _decode_batches_loop(self): while True: batch = await self._batches_to_decode.get() + logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages)) await self._decode_batch_inplace(batch) self._add_batch_to_queue(batch) self._state_changed.set() @@ -833,9 +869,21 @@ class ReaderStream: part_sess_id = batch._partition_session.id if part_sess_id in self._message_batches: self._message_batches[part_sess_id]._extend(batch) + logger.debug( + "reader stream %s extend batch partition=%s size=%s", + self._id, + part_sess_id, + len(batch.messages), + ) return self._message_batches[part_sess_id] = batch + logger.debug( + "reader stream %s new batch partition=%s size=%s", + self._id, + part_sess_id, + len(batch.messages), + ) async def _decode_batch_inplace(self, batch): if batch._codec == Codec.CODEC_RAW: @@ -882,6 +930,7 @@ class ReaderStream: return self._closed = True + logger.debug("reader stream %s close", self._id) if flush: await self.flush() @@ -899,3 +948,5 @@ class ReaderStream: if self._background_tasks: await asyncio.wait(self._background_tasks) + + logger.debug("reader stream %s was closed", self._id) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py index f7590a2195c..3eea0390f7d 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py @@ -64,7 +64,7 @@ class TopicReaderSync: def __del__(self): if not self._closed: try: - logger.warning("Topic reader was not closed properly. Consider using method close().") + logger.debug("Topic reader was not closed properly. Consider using method close().") self.close(flush=False) except BaseException: logger.warning("Something went wrong during reader close in __del__") diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py index a3e407ed86d..16feded7771 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer.py @@ -213,10 +213,10 @@ def messages_to_proto_requests( tx_identity: Optional[TransactionIdentity], ) -> List[StreamWriteMessage.FromClient]: - gropus = _slit_messages_for_send(messages) + groups = _split_messages_for_send(messages) res = [] # type: List[StreamWriteMessage.FromClient] - for group in gropus: + for group in groups: req = StreamWriteMessage.FromClient( StreamWriteMessage.WriteRequest( messages=list(map(InternalMessage.to_message_data, group)), @@ -254,7 +254,7 @@ _message_data_overhead = ( ) -def _slit_messages_for_send( +def _split_messages_for_send( messages: List[InternalMessage], ) -> List[List[InternalMessage]]: codec_groups = [] # type: List[List[InternalMessage]] diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py index ec5b21661d4..eeecbfd2e81 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py @@ -26,6 +26,7 @@ from .. import ( _apis, issues, ) +from .._utilities import AtomicCounter from .._errors import check_retriable_error from .._topic_common import common as topic_common from ..retries import RetrySettings @@ -82,7 +83,7 @@ class WriterAsyncIO: if self._closed or self._loop.is_closed(): return try: - logger.warning("Topic writer was not closed properly. Consider using method close().") + logger.debug("Topic writer was not closed properly. Consider using method close().") task = self._loop.create_task(self.close(flush=False)) topic_common.wrap_set_name_for_asyncio_task(task, task_name="close writer") except BaseException: @@ -92,9 +93,11 @@ class WriterAsyncIO: if self._closed: return + logger.debug("Close topic writer") self._closed = True await self._reconnector.close(flush) + logger.debug("Topic writer was closed") async def write_with_ack( self, @@ -108,6 +111,10 @@ class WriterAsyncIO: For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write_with_ack %s messages", + len(messages) if isinstance(messages, list) else 1, + ) futures = await self.write_with_ack_future(messages) if not isinstance(futures, list): futures = [futures] @@ -129,6 +136,10 @@ class WriterAsyncIO: For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write_with_ack_future %s messages", + len(messages) if isinstance(messages, list) else 1, + ) input_single_message = not isinstance(messages, list) converted_messages = [] if isinstance(messages, list): @@ -153,6 +164,10 @@ class WriterAsyncIO: For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write %s messages", + len(messages) if isinstance(messages, list) else 1, + ) await self.write_with_ack_future(messages) async def flush(self): @@ -162,6 +177,7 @@ class WriterAsyncIO: For wait with timeout use asyncio.wait_for. """ + logger.debug("flush writer") return await self._reconnector.flush() async def wait_init(self) -> PublicWriterInitInfo: @@ -170,6 +186,7 @@ class WriterAsyncIO: For wait with timeout use asyncio.wait_for() """ + logger.debug("wait writer init") return await self._reconnector.wait_init() @@ -225,6 +242,8 @@ class TxWriterAsyncIO(WriterAsyncIO): class WriterAsyncIOReconnector: + _static_id_counter = AtomicCounter() + _closed: bool _loop: asyncio.AbstractEventLoop _credentials: Union[ydb.credentials.Credentials, None] @@ -260,6 +279,7 @@ class WriterAsyncIOReconnector: self, driver: SupportedDriverType, settings: WriterSettings, tx: Optional["BaseQueryTxContext"] = None ): self._closed = False + self._id = WriterAsyncIOReconnector._static_id_counter.inc_and_get() self._loop = asyncio.get_running_loop() self._driver = driver self._credentials = driver._credentials @@ -307,12 +327,13 @@ class WriterAsyncIOReconnector: ] self._state_changed = asyncio.Event() + logger.debug("init writer reconnector id=%s", self._id) async def close(self, flush: bool): if self._closed: return self._closed = True - logger.debug("Close writer reconnector") + logger.debug("Close writer reconnector id=%s", self._id) if flush: await self.flush() @@ -329,6 +350,8 @@ class WriterAsyncIOReconnector: except TopicWriterStopped: pass + logger.debug("Writer reconnector id=%s was closed", self._id) + async def wait_init(self) -> PublicWriterInitInfo: while True: if self._stop_reason.done(): @@ -418,6 +441,7 @@ class WriterAsyncIOReconnector: # noinspection PyBroadException stream_writer = None try: + logger.debug("writer reconnector %s connect attempt %s", self._id, attempt) tx_identity = None if self._tx is None else self._tx._tx_identity() stream_writer = await WriterAsyncIOStream.create( self._driver, @@ -425,6 +449,11 @@ class WriterAsyncIOReconnector: self._settings.update_token_interval, tx_identity=tx_identity, ) + logger.debug( + "writer reconnector %s connected stream %s", + self._id, + stream_writer._id, + ) try: if self._init_info is None: self._last_known_seq_no = stream_writer.last_seqno @@ -458,6 +487,11 @@ class WriterAsyncIOReconnector: return await asyncio.sleep(err_info.sleep_timeout_seconds) + logger.debug( + "writer reconnector %s retry in %s seconds", + self._id, + err_info.sleep_timeout_seconds, + ) except (asyncio.CancelledError, Exception) as err: self._stop(err) @@ -477,6 +511,12 @@ class WriterAsyncIOReconnector: while not self._messages_for_encode.empty(): messages.extend(self._messages_for_encode.get_nowait()) + logger.debug( + "writer reconnector %s encode %s messages", + self._id, + len(messages), + ) + batch_codec = await self._codec_selector(messages) await self._encode_data_inplace(batch_codec, messages) self._add_messages_to_send_queue(messages) @@ -582,6 +622,8 @@ class WriterAsyncIOReconnector: while True: resp = await writer.receive() + logger.debug("writer reconnector %s received %s acks", self._id, len(resp.acks)) + for ack in resp.acks: self._handle_receive_ack(ack) @@ -604,20 +646,37 @@ class WriterAsyncIOReconnector: else: raise TopicWriterError("internal error - receive unexpected ack message.") message_future.set_result(result) + logger.debug( + "writer reconnector %s ack seqno=%s result=%s", + self._id, + ack.seq_no, + type(result).__name__, + ) async def _send_loop(self, writer: "WriterAsyncIOStream"): try: + logger.debug("writer reconnector %s send loop start", self._id) messages = list(self._messages) last_seq_no = 0 for m in messages: writer.write([m]) + logger.debug( + "writer reconnector %s sent buffered message seqno=%s", + self._id, + m.seq_no, + ) last_seq_no = m.seq_no while True: m = await self._new_messages.get() # type: InternalMessage if m.seq_no > last_seq_no: writer.write([m]) + logger.debug( + "writer reconnector %s sent message seqno=%s", + self._id, + m.seq_no, + ) except asyncio.CancelledError: # the loop task cancelled be parent code, for example for reconnection # no need to stop all work. @@ -639,7 +698,7 @@ class WriterAsyncIOReconnector: f.set_exception(reason) self._state_changed.set() - logger.info("Stop topic writer: %s" % reason) + logger.info("Stop topic writer %s: %s" % (self._id, reason)) async def flush(self): if not self._messages_future: @@ -650,6 +709,8 @@ class WriterAsyncIOReconnector: class WriterAsyncIOStream: + _static_id_counter = AtomicCounter() + # todo slots _closed: bool @@ -674,6 +735,7 @@ class WriterAsyncIOStream: tx_identity: Optional[TransactionIdentity] = None, ): self._closed = False + self._id = WriterAsyncIOStream._static_id_counter.inc_and_get() self._update_token_interval = update_token_interval self._get_token_function = get_token_function @@ -686,12 +748,14 @@ class WriterAsyncIOStream: if self._closed: return self._closed = True + logger.debug("writer stream %s close", self._id) if self._update_token_task: self._update_token_task.cancel() await asyncio.wait([self._update_token_task]) self._stream.close() + logger.debug("writer stream %s was closed", self._id) @staticmethod async def create( @@ -711,6 +775,11 @@ class WriterAsyncIOStream: tx_identity=tx_identity, ) await writer._start(stream, init_request) + logger.debug( + "writer stream %s started seqno=%s", + writer._id, + writer.last_seqno, + ) return writer async def receive(self) -> StreamWriteMessage.WriteResponse: @@ -727,6 +796,7 @@ class WriterAsyncIOStream: raise Exception("Unknown message while read writer answers: %s" % item) async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMessage.InitRequest): + logger.debug("writer stream %s send init request", self._id) stream.write(StreamWriteMessage.FromClient(init_message)) resp = await stream.receive() @@ -736,6 +806,11 @@ class WriterAsyncIOStream: self.last_seqno = resp.last_seq_no self.supported_codecs = [PublicCodec(codec) for codec in resp.supported_codecs] + logger.debug( + "writer stream %s init done last_seqno=%s", + self._id, + self.last_seqno, + ) self._stream = stream @@ -755,6 +830,8 @@ class WriterAsyncIOStream: if self._closed: raise RuntimeError("Can not write on closed stream.") + logger.debug("writer stream %s send %s messages", self._id, len(messages)) + for request in messages_to_proto_requests(messages, self._tx_identity): self._stream.write(request) @@ -764,6 +841,7 @@ class WriterAsyncIOStream: token = self._get_token_function() if asyncio.iscoroutine(token): token = await token + logger.debug("writer stream %s update token", self._id) await self._update_token(token=token) async def _update_token(self, token: str): @@ -771,5 +849,6 @@ class WriterAsyncIOStream: try: msg = StreamWriteMessage.FromClient(UpdateTokenRequest(token)) self._stream.write(msg) + logger.debug("writer stream %s token sent", self._id) finally: self._update_token_event.clear() diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py index 954864c9682..7806d7faba8 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_sync.py @@ -76,7 +76,7 @@ class WriterSync: def __del__(self): if not self._closed: try: - logger.warning("Topic writer was not closed properly. Consider using method close().") + logger.debug("Topic writer was not closed properly. Consider using method close().") self.close(flush=False) except BaseException: logger.warning("Something went wrong during writer close in __del__") @@ -85,6 +85,7 @@ class WriterSync: if self._closed: return + logger.debug("Close topic writer") self._closed = True self._caller.safe_call_with_result(self._async_writer.close(flush=flush), timeout) @@ -101,16 +102,22 @@ class WriterSync: def flush(self, *, timeout=None): self._check_closed() + logger.debug("flush writer") + return self._caller.unsafe_call_with_result(self._async_writer.flush(), timeout) def async_wait_init(self) -> Future[PublicWriterInitInfo]: self._check_closed() + logger.debug("wait writer init") + return self._caller.unsafe_call_with_future(self._async_writer.wait_init()) def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo: self._check_closed() + logger.debug("wait writer init") + return self._caller.unsafe_call_with_result(self._async_writer.wait_init(), timeout) def write( @@ -120,6 +127,11 @@ class WriterSync: ): self._check_closed() + logger.debug( + "write %s messages", + len(messages) if isinstance(messages, list) else 1, + ) + self._caller.safe_call_with_result(self._async_writer.write(messages), timeout) def async_write_with_ack( @@ -137,6 +149,11 @@ class WriterSync: ) -> Union[PublicWriteResult, List[PublicWriteResult]]: self._check_closed() + logger.debug( + "write_with_ack %s messages", + len(messages) if isinstance(messages, list) else 1, + ) + return self._caller.unsafe_call_with_result(self._async_writer.write_with_ack(messages), timeout=timeout) diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py index 8e65cd3b833..d5b6ed50c69 100644 --- a/contrib/python/ydb/py3/ydb/connection.py +++ b/contrib/python/ydb/py3/ydb/connection.py @@ -26,6 +26,7 @@ YDB_TRACE_ID_HEADER = "x-ydb-trace-id" YDB_REQUEST_TYPE_HEADER = "x-ydb-request-type" _DEFAULT_MAX_GRPC_MESSAGE_SIZE = 64 * 10**6 +_DEFAULT_KEEPALIVE_TIMEOUT = 10000 def _message_to_string(message): @@ -185,15 +186,18 @@ def _construct_channel_options(driver_config, endpoint_options=None): getattr(driver_config, "grpc_lb_policy_name", "round_robin"), ), ] - if driver_config.grpc_keep_alive_timeout is not None: - _default_connect_options.extend( - [ - ("grpc.keepalive_time_ms", driver_config.grpc_keep_alive_timeout >> 3), - ("grpc.keepalive_timeout_ms", driver_config.grpc_keep_alive_timeout), - ("grpc.http2.max_pings_without_data", 0), - ("grpc.keepalive_permit_without_calls", 0), - ] - ) + if driver_config.grpc_keep_alive_timeout is None: + driver_config.grpc_keep_alive_timeout = _DEFAULT_KEEPALIVE_TIMEOUT + + _default_connect_options.extend( + [ + ("grpc.keepalive_time_ms", driver_config.grpc_keep_alive_timeout >> 3), + ("grpc.keepalive_timeout_ms", driver_config.grpc_keep_alive_timeout), + ("grpc.http2.max_pings_without_data", 0), + ("grpc.keepalive_permit_without_calls", 0), + ] + ) + if endpoint_options is not None: if endpoint_options.ssl_target_name_override: _default_connect_options.append( diff --git a/contrib/python/ydb/py3/ydb/issues.py b/contrib/python/ydb/py3/ydb/issues.py index 4e76f5ed2b0..1971870ca2b 100644 --- a/contrib/python/ydb/py3/ydb/issues.py +++ b/contrib/python/ydb/py3/ydb/issues.py @@ -50,6 +50,7 @@ class StatusCode(enum.IntEnum): UNAUTHENTICATED = _CLIENT_STATUSES_FIRST + 30 SESSION_POOL_EMPTY = _CLIENT_STATUSES_FIRST + 40 + SESSION_POOL_CLOSED = _CLIENT_STATUSES_FIRST + 50 # TODO: convert from proto IssueMessage @@ -178,6 +179,13 @@ class SessionPoolEmpty(Error, queue.Empty): status = StatusCode.SESSION_POOL_EMPTY +class SessionPoolClosed(Error): + status = StatusCode.SESSION_POOL_CLOSED + + def __init__(self): + super().__init__("Session pool is closed.") + + class ClientInternalError(Error): status = StatusCode.CLIENT_INTERNAL_ERROR diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index 2c16716c513..4007e72dddc 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -137,40 +137,43 @@ def create_execute_query_request( parameters: Optional[dict], concurrent_result_sets: Optional[bool], ) -> ydb_query.ExecuteQueryRequest: - syntax = QuerySyntax.YQL_V1 if not syntax else syntax - exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode - stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode + try: + syntax = QuerySyntax.YQL_V1 if not syntax else syntax + exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode + stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode - tx_control = None - if not tx_id and not tx_mode: tx_control = None - elif tx_id: - tx_control = ydb_query.TransactionControl( - tx_id=tx_id, - commit_tx=commit_tx, - begin_tx=None, - ) - else: - tx_control = ydb_query.TransactionControl( - begin_tx=ydb_query.TransactionSettings( - tx_mode=tx_mode, + if not tx_id and not tx_mode: + tx_control = None + elif tx_id: + tx_control = ydb_query.TransactionControl( + tx_id=tx_id, + commit_tx=commit_tx, + begin_tx=None, + ) + else: + tx_control = ydb_query.TransactionControl( + begin_tx=ydb_query.TransactionSettings( + tx_mode=tx_mode, + ), + commit_tx=commit_tx, + tx_id=None, + ) + + return ydb_query.ExecuteQueryRequest( + session_id=session_id, + query_content=ydb_query.QueryContent.from_public( + query=query, + syntax=syntax, ), - commit_tx=commit_tx, - tx_id=None, + tx_control=tx_control, + exec_mode=exec_mode, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + stats_mode=stats_mode, ) - - return ydb_query.ExecuteQueryRequest( - session_id=session_id, - query_content=ydb_query.QueryContent.from_public( - query=query, - syntax=syntax, - ), - tx_control=tx_control, - exec_mode=exec_mode, - parameters=parameters, - concurrent_result_sets=concurrent_result_sets, - stats_mode=stats_mode, - ) + except BaseException as e: + raise issues.ClientInternalError("Unable to prepare execute request") from e def bad_session_handler(func): diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index 1cf95ac0d13..fc05950c9d6 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -1,4 +1,5 @@ import logging +from concurrent import futures from typing import ( Callable, Optional, @@ -36,14 +37,17 @@ class QuerySessionPool: size: int = 100, *, query_client_settings: Optional[QueryClientSettings] = None, + workers_threads_count: int = 4, ): """ :param driver: A driver instance. :param size: Max size of Session Pool. :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior + :param workers_threads_count: A number of threads in executor used for *_async methods """ self._driver = driver + self._tp = futures.ThreadPoolExecutor(workers_threads_count) self._queue = queue.Queue() self._current_size = 0 self._size = size @@ -72,7 +76,7 @@ class QuerySessionPool: try: if self._should_stop.is_set(): logger.error("An attempt to take session from closed session pool.") - raise RuntimeError("An attempt to take session from closed session pool.") + raise issues.SessionPoolClosed() session = None try: @@ -132,6 +136,9 @@ class QuerySessionPool: :return: Result sets or exception in case of execution errors. """ + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + retry_settings = RetrySettings() if retry_settings is None else retry_settings def wrapped_callee(): @@ -140,6 +147,38 @@ class QuerySessionPool: return retry_operation_sync(wrapped_callee, retry_settings) + def retry_tx_async( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ) -> futures.Future: + """Asynchronously execute a transaction in a retriable way.""" + + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + + return self._tp.submit( + self.retry_tx_sync, + callee, + tx_mode, + retry_settings, + *args, + **kwargs, + ) + + def retry_operation_async( + self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + ) -> futures.Future: + """Asynchronously execute a retryable operation.""" + + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + + return self._tp.submit(self.retry_operation_sync, callee, retry_settings, *args, **kwargs) + def retry_tx_sync( self, callee: Callable, @@ -161,6 +200,9 @@ class QuerySessionPool: :return: Result sets or exception in case of execution errors. """ + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() retry_settings = RetrySettings() if retry_settings is None else retry_settings @@ -194,6 +236,9 @@ class QuerySessionPool: :return: Result sets or exception in case of execution errors. """ + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + retry_settings = RetrySettings() if retry_settings is None else retry_settings def wrapped_callee(): @@ -203,11 +248,34 @@ class QuerySessionPool: return retry_operation_sync(wrapped_callee, retry_settings) + def execute_with_retries_async( + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ) -> futures.Future: + """Asynchronously execute a query with retries.""" + + if self._should_stop.is_set(): + raise issues.SessionPoolClosed() + + return self._tp.submit( + self.execute_with_retries, + query, + parameters, + retry_settings, + *args, + **kwargs, + ) + def stop(self, timeout=None): acquire_timeout = timeout if timeout is not None else -1 acquired = self._lock.acquire(timeout=acquire_timeout) try: self._should_stop.set() + self._tp.shutdown(wait=True) while True: try: session = self._queue.get_nowait() diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index aa6c7eb4f3b..5e86be68e17 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -122,7 +122,7 @@ class TopicClientAsyncIO: def __del__(self): if not self._closed: try: - logger.warning("Topic client was not closed properly. Consider using method close().") + logger.debug("Topic client was not closed properly. Consider using method close().") self.close() except BaseException: logger.warning("Something went wrong during topic client close in __del__") @@ -161,6 +161,7 @@ class TopicClientAsyncIO: :param consumers: List of consumers for this topic :param metering_mode: Metering mode for the topic in a serverless database """ + logger.debug("Create topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.CreateTopicRequestParams(**args) @@ -210,6 +211,7 @@ class TopicClientAsyncIO: :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. Empty list mean disable codec compatibility checks for the topic. """ + logger.debug("Alter topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.AlterTopicRequestParams(**args) @@ -222,6 +224,7 @@ class TopicClientAsyncIO: ) async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: + logger.debug("Describe topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.DescribeTopicRequestParams(**args) @@ -234,6 +237,7 @@ class TopicClientAsyncIO: return res.to_public() async def drop_topic(self, path: str): + logger.debug("Drop topic request: path=%s", path) req = _ydb_topic_public_types.DropTopicRequestParams(path=path) await self._driver( req.to_proto(), @@ -257,6 +261,8 @@ class TopicClientAsyncIO: event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReaderAsyncIO: + logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) + if not decoder_executor: decoder_executor = self._executor @@ -301,6 +307,7 @@ class TopicClientAsyncIO: # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicWriterAsyncIO: + logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id) args = locals().copy() del args["self"] @@ -329,6 +336,7 @@ class TopicClientAsyncIO: # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicTxWriterAsyncIO: + logger.debug("Create tx writer for topic=%s tx=%s", topic, tx) args = locals().copy() del args["self"] del args["tx"] @@ -343,6 +351,13 @@ class TopicClientAsyncIO: async def commit_offset( self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None ) -> None: + logger.debug( + "Commit offset: path=%s partition_id=%s offset=%s consumer=%s", + path, + partition_id, + offset, + consumer, + ) req = _ydb_topic.CommitOffsetRequest( path=path, consumer=consumer, @@ -362,6 +377,7 @@ class TopicClientAsyncIO: if self._closed: return + logger.debug("Close topic client") self._closed = True self._executor.shutdown(wait=False) @@ -433,6 +449,7 @@ class TopicClient: :param consumers: List of consumers for this topic :param metering_mode: Metering mode for the topic in a serverless database """ + logger.debug("Create topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -484,6 +501,7 @@ class TopicClient: :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. Empty list mean disable codec compatibility checks for the topic. """ + logger.debug("Alter topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -498,6 +516,7 @@ class TopicClient: ) def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: + logger.debug("Describe topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -514,6 +533,8 @@ class TopicClient: def drop_topic(self, path: str): self._check_closed() + logger.debug("Drop topic request: path=%s", path) + req = _ydb_topic_public_types.DropTopicRequestParams(path=path) self._driver( req.to_proto(), @@ -536,6 +557,7 @@ class TopicClient: auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReader: + logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) if not decoder_executor: decoder_executor = self._executor @@ -580,6 +602,7 @@ class TopicClient: # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: + logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id) args = locals().copy() del args["self"] self._check_closed() @@ -609,6 +632,7 @@ class TopicClient: # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: + logger.debug("Create tx writer for topic=%s tx=%s", topic, tx) args = locals().copy() del args["self"] del args["tx"] @@ -624,6 +648,13 @@ class TopicClient: def commit_offset( self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None ) -> None: + logger.debug( + "Commit offset: path=%s partition_id=%s offset=%s consumer=%s", + path, + partition_id, + offset, + consumer, + ) req = _ydb_topic.CommitOffsetRequest( path=path, consumer=consumer, @@ -643,8 +674,10 @@ class TopicClient: if self._closed: return + logger.debug("Close topic client") self._closed = True self._executor.shutdown(wait=False) + logger.debug("Topic client was closed") def _check_closed(self): if not self._closed: diff --git a/contrib/python/ydb/py3/ydb/types.py b/contrib/python/ydb/py3/ydb/types.py index 47c9c48c2e2..5ef601a7ca2 100644 --- a/contrib/python/ydb/py3/ydb/types.py +++ b/contrib/python/ydb/py3/ydb/types.py @@ -32,8 +32,10 @@ def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientS return x.uint32_value -def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None: - if isinstance(value, date): +def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, datetime, int]) -> None: + if isinstance(value, datetime): + pb.uint32_value = (value.date() - _EPOCH.date()).days + elif isinstance(value, date): pb.uint32_value = (value - _EPOCH.date()).days else: pb.uint32_value = value diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index f3776ea68db..9062621ad98 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.21.4" +VERSION = "3.21.6" |