From 2135a97d85fc48400cc3d0942311be98bc9148da Mon Sep 17 00:00:00 2001 From: robot-piglet Date: Wed, 1 Jul 2026 11:17:19 +0300 Subject: Intermediate changes commit_hash:f46c06c29696f6cfd01e306372ca9b430bd6b9e3 --- contrib/python/ydb/py3/.dist-info/METADATA | 5 +- contrib/python/ydb/py3/README.md | 1 + .../py3/patches/03-change-inport-protobufs.patch | 5 +- contrib/python/ydb/py3/ya.make | 6 +- .../py3/ydb/_topic_reader/topic_reader_asyncio.py | 131 ++++++++++++---- .../py3/ydb/_topic_writer/topic_writer_asyncio.py | 37 +++-- contrib/python/ydb/py3/ydb/aio/connection.py | 19 ++- contrib/python/ydb/py3/ydb/aio/driver.py | 1 + contrib/python/ydb/py3/ydb/aio/pool.py | 40 ++++- contrib/python/ydb/py3/ydb/aio/query/base.py | 16 +- contrib/python/ydb/py3/ydb/aio/query/session.py | 39 +++-- .../python/ydb/py3/ydb/aio/query/transaction.py | 81 ++++++---- contrib/python/ydb/py3/ydb/aio/table.py | 3 +- contrib/python/ydb/py3/ydb/connection.py | 26 +++- contrib/python/ydb/py3/ydb/convert.py | 53 +++++-- .../python/ydb/py3/ydb/opentelemetry/__init__.py | 36 +++++ contrib/python/ydb/py3/ydb/opentelemetry/plugin.py | 134 +++++++++++++++++ .../python/ydb/py3/ydb/opentelemetry/tracing.py | 165 +++++++++++++++++++++ contrib/python/ydb/py3/ydb/pool.py | 51 +++++-- contrib/python/ydb/py3/ydb/query/base.py | 17 ++- contrib/python/ydb/py3/ydb/query/session.py | 67 +++++++-- contrib/python/ydb/py3/ydb/query/transaction.py | 85 +++++++---- contrib/python/ydb/py3/ydb/resolver.py | 6 +- contrib/python/ydb/py3/ydb/retries.py | 64 +++++--- contrib/python/ydb/py3/ydb/ydb_version.py | 2 +- 25 files changed, 894 insertions(+), 196 deletions(-) create mode 100644 contrib/python/ydb/py3/ydb/opentelemetry/__init__.py create mode 100644 contrib/python/ydb/py3/ydb/opentelemetry/plugin.py create mode 100644 contrib/python/ydb/py3/ydb/opentelemetry/tracing.py (limited to 'contrib/python') diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 3c0337ed5e0..809298530a7 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.28.4 +Version: 3.29.6 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC @@ -21,6 +21,8 @@ Requires-Dist: grpcio >=1.42.0 Requires-Dist: packaging Requires-Dist: protobuf <7.0.0,>=3.13.0 Requires-Dist: aiohttp <4 +Provides-Extra: opentelemetry +Requires-Dist: opentelemetry-api >=1.0.0 ; extra == 'opentelemetry' Provides-Extra: yc Requires-Dist: yandexcloud ; extra == 'yc' @@ -31,6 +33,7 @@ YDB Python SDK [![API Reference](https://img.shields.io/badge/API-Reference-lightgreen.svg)](https://ydb-platform.github.io/ydb-python-sdk) [![Functional tests](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml) [![Style checks](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml) +[![codecov](https://codecov.io/gh/ydb-platform/ydb-python-sdk/graph/badge.svg)](https://codecov.io/gh/ydb-platform/ydb-python-sdk) Officially supported Python client for YDB. diff --git a/contrib/python/ydb/py3/README.md b/contrib/python/ydb/py3/README.md index 7842bd1d910..eeda8db3b42 100644 --- a/contrib/python/ydb/py3/README.md +++ b/contrib/python/ydb/py3/README.md @@ -5,6 +5,7 @@ YDB Python SDK [![API Reference](https://img.shields.io/badge/API-Reference-lightgreen.svg)](https://ydb-platform.github.io/ydb-python-sdk) [![Functional tests](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml) [![Style checks](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml) +[![codecov](https://codecov.io/gh/ydb-platform/ydb-python-sdk/graph/badge.svg)](https://codecov.io/gh/ydb-platform/ydb-python-sdk) Officially supported Python client for YDB. diff --git a/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch index a865f95a382..156c6d7799e 100644 --- a/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch +++ b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch @@ -87,10 +87,11 @@ from ..common.protos import ydb_topic_pb2 --- contrib/python/ydb/py3/ydb/aio/connection.py (index) +++ contrib/python/ydb/py3/ydb/aio/connection.py (working tree) -@@ -27,11 +27,10 @@ from ydb.driver import DriverConfig +@@ -28,12 +28,11 @@ from ydb import issues from ydb.settings import BaseRequestSettings from ydb import issues - + from ydb.opentelemetry.tracing import get_trace_metadata + -# Workaround for good IDE and universal for runtime -if TYPE_CHECKING: - from ydb._grpc.v4 import ydb_topic_v1_pb2_grpc diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index b75adf12a55..5c39099e85f 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.28.4) +VERSION(3.29.6) LICENSE(Apache-2.0) @@ -16,6 +16,7 @@ PEERDIR( NO_LINT() NO_CHECK_IMPORTS( + ydb.opentelemetry.plugin ydb.public.api.grpc ydb.public.api.grpc.* ) @@ -104,6 +105,9 @@ PY_SRCS( ydb/oauth2_token_exchange/__init__.py ydb/oauth2_token_exchange/token_exchange.py ydb/oauth2_token_exchange/token_source.py + ydb/opentelemetry/__init__.py + ydb/opentelemetry/plugin.py + ydb/opentelemetry/tracing.py ydb/operation.py ydb/pool.py ydb/query/__init__.py diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index f4e5a4f88a5..ee988a1ec3b 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -217,6 +217,7 @@ class ReaderReconnector: _stream_reader: Optional["ReaderStream"] _first_error: asyncio.Future[YdbError] _tx_to_batches_map: Dict[str, typing.List[datatypes.PublicBatch]] + _closed: bool def __init__( self, @@ -233,6 +234,7 @@ class ReaderReconnector: self._state_changed = asyncio.Event() self._stream_reader = None + self._closed = False self._background_tasks.add(asyncio.create_task(self._connection_loop())) self._first_error = asyncio.get_running_loop().create_future() @@ -241,6 +243,8 @@ class ReaderReconnector: async def _connection_loop(self): attempt = 0 while True: + if self._closed: + return try: logger.debug("reader %s connect attempt %s", self._id, attempt) self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings) @@ -266,8 +270,12 @@ class ReaderReconnector: # noinspection PyBroadException try: await self._stream_reader.close(flush=False) - except BaseException: - # supress any error on close stream reader + except asyncio.CancelledError: + # propagate cancellation (e.g. from reader.close()) so the loop stops + # instead of swallowing it and reconnecting into a zombie stream + raise + except Exception: + # suppress any error on close stream reader pass async def wait_message(self): @@ -323,14 +331,44 @@ class ReaderReconnector: tx._add_callback(TxEvent.AFTER_COMMIT, self._handle_after_tx_commit, self._loop) tx._add_callback(TxEvent.AFTER_ROLLBACK, self._handle_after_tx_rollback, self._loop) + def _batch_partition_session_expired(self, batch: datatypes.PublicBatch) -> bool: + # A batch is expired if the reader reconnected after it was received: its partition + # session no longer belongs to the current stream. Mirrors the guard in + # ReaderStream.commit() for the non-transactional commit path. + stream = self._stream_reader + partition_session = batch._partition_session + return ( + stream is None + or partition_session.reader_stream_id != stream._id + or partition_session.id not in stream._partition_sessions + ) + async def _commit_batches_with_tx(self, tx: "BaseQueryTxContext"): tx_id = tx.tx_id if tx_id is None: raise TopicReaderError("Transaction ID is None") + + batches = self._tx_to_batches_map[tx_id] + + if any(self._batch_partition_session_expired(batch) for batch in batches): + # The reader reconnected between receive_batch_with_tx() and tx.commit(), so + # these offsets belong to a partition session that no longer exists. Committing + # them would send a stale/gapped range (server "Gap", issue_code 2011) while the + # client believes the commit succeeded. Fail the tx instead (retriable) without + # sending the request; the AFTER_COMMIT handler then reconnects to reset the + # read-ahead state, and the pool re-reads from the committed offset. + err = issues.ClientInternalError( + "Topic reader partition session expired before tx commit; " + "offsets were not committed, the transaction will be retried" + ) + tx._set_external_error(err) + del self._tx_to_batches_map[tx_id] + return + grouped_batches: Dict[str, Dict[int, typing.List[datatypes.PublicBatch]]] = defaultdict( lambda: defaultdict(list) ) - for batch in self._tx_to_batches_map[tx_id]: + for batch in batches: grouped_batches[batch._partition_session.topic_path][batch._partition_session.partition_id].append(batch) consumer = self._settings.consumer @@ -401,8 +439,16 @@ class ReaderReconnector: async def close(self, flush: bool): logger.debug("reader reconnector %s close", self._id) + # Mark closed so the connection loop won't start a new stream, then close the + # current stream with the requested flush before cancelling the loop. On a normal + # close this flushes pending commits; cancelling the loop first would let it close + # the stream with flush=False instead and skip the flush. + self._closed = True if self._stream_reader: await self._stream_reader.close(flush) + # Wake any pending wait_message() waiter (e.g. a concurrent receive) so it doesn't + # hang if the loop was reconnecting when close() cancelled it. + self._set_first_error(TopicReaderStreamClosedError()) for task in self._background_tasks: task.cancel() @@ -469,6 +515,11 @@ class ReaderStream: self._id = ReaderStream._static_id_counter.inc_and_get() self._reader_reconnector_id = reader_reconnector_id self._session_id = "not initialized" + self._log_prefix = "reader %s stream %s session=%s" % ( + self._reader_reconnector_id, + self._id, + self._session_id, + ) self._stream = None self._started = False self._background_tasks = set() @@ -503,17 +554,28 @@ class ReaderStream: settings: topic_reader.PublicReaderSettings, ) -> "ReaderStream": stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto) + reader = None + try: + await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead) - await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead) - - creds = driver._credentials - reader = ReaderStream( - reader_reconnector_id, - settings, - get_token_function=creds.get_auth_token if creds else None, - ) - await reader._start(stream, settings._init_message()) - logger.debug("reader stream %s started session=%s", reader._id, reader._session_id) + creds = driver._credentials + reader = ReaderStream( + reader_reconnector_id, + settings, + get_token_function=creds.get_auth_token if creds else None, + ) + await reader._start(stream, settings._init_message()) + except BaseException: + # If create() is interrupted (e.g. reader.close() cancels the connection loop + # mid-reconnect) the in-flight stream is not yet assigned to the reconnector, so + # its finally cannot reach it. Close it here to avoid a zombie gRPC read session + # that keeps holding the consumer's partition on the server. + if reader is not None: + await reader.close(flush=False) + else: + stream.close() + raise + logger.debug("%s started", reader._log_prefix) return reader async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest): @@ -522,7 +584,7 @@ class ReaderStream: self._started = True self._stream = stream - logger.debug("reader stream %s send init request", self._id) + logger.debug("%s send init request", self._log_prefix) stream.write(StreamReadMessage.FromClient(client_message=init_message)) try: @@ -534,7 +596,12 @@ class ReaderStream: if isinstance(init_response.server_message, StreamReadMessage.InitResponse): self._session_id = init_response.server_message.session_id - logger.debug("reader stream %s initialized session=%s", self._id, self._session_id) + self._log_prefix = "reader %s stream %s session=%s" % ( + self._reader_reconnector_id, + self._id, + self._session_id, + ) + logger.debug("%s initialized", self._log_prefix) else: raise TopicReaderError("Unexpected message after InitRequest: %s" % init_response) @@ -668,7 +735,7 @@ class ReaderStream: async def _read_messages_loop(self): try: - logger.debug("reader stream %s start read loop", self._id) + logger.debug("%s start read loop", self._log_prefix) self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.ReadRequest( @@ -682,7 +749,7 @@ class ReaderStream: _process_response(message.server_status) if isinstance(message.server_message, StreamReadMessage.ReadResponse): - logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size) + logger.debug("%s read %s bytes", self._log_prefix, message.server_message.bytes_size) self._on_read_response(message.server_message) elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): @@ -693,8 +760,8 @@ class ReaderStream: StreamReadMessage.StartPartitionSessionRequest, ): logger.debug( - "reader stream %s start partition %s", - self._id, + "%s start partition %s", + self._log_prefix, message.server_message.partition_session.partition_session_id, ) await self._on_start_partition_session(message.server_message) @@ -704,8 +771,8 @@ class ReaderStream: StreamReadMessage.StopPartitionSessionRequest, ): logger.debug( - "reader stream %s stop partition %s", - self._id, + "%s stop partition %s", + self._log_prefix, message.server_message.partition_session_id, ) self._on_partition_session_stop(message.server_message) @@ -715,8 +782,8 @@ class ReaderStream: StreamReadMessage.EndPartitionSession, ): logger.debug( - "reader stream %s end partition %s", - self._id, + "%s end partition %s", + self._log_prefix, message.server_message.partition_session_id, ) self._on_end_partition_session(message.server_message) @@ -733,12 +800,12 @@ class ReaderStream: self._state_changed.set() except asyncio.CancelledError as e: - logger.debug("reader stream %s error: %s", self._id, e) + logger.debug("%s error: %s", self._log_prefix, e) if not self._closed: self._set_first_error(issues.ConnectionLost("gRPC stream cancelled")) raise except Exception as e: - logger.debug("reader stream %s error: %s", self._id, e) + logger.debug("%s error: %s", self._log_prefix, e) self._set_first_error(e) return @@ -908,7 +975,7 @@ class ReaderStream: async def _decode_batches_loop(self): while True: batch = await self._batches_to_decode.get() - logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages)) + logger.debug("%s decode batch %s messages", self._log_prefix, len(batch.messages)) await self._decode_batch_inplace(batch) self._add_batch_to_queue(batch) self._state_changed.set() @@ -918,8 +985,8 @@ class ReaderStream: if part_sess_id in self._message_batches: self._message_batches[part_sess_id]._extend(batch) logger.debug( - "reader stream %s extend batch partition=%s size=%s", - self._id, + "%s extend batch partition=%s size=%s", + self._log_prefix, part_sess_id, len(batch.messages), ) @@ -927,8 +994,8 @@ class ReaderStream: self._message_batches[part_sess_id] = batch logger.debug( - "reader stream %s new batch partition=%s size=%s", - self._id, + "%s new batch partition=%s size=%s", + self._log_prefix, part_sess_id, len(batch.messages), ) @@ -979,7 +1046,7 @@ class ReaderStream: return self._closed = True - logger.debug("reader stream %s close", self._id) + logger.debug("%s close", self._log_prefix) if flush: await self.flush() @@ -999,4 +1066,4 @@ class ReaderStream: if self._background_tasks: await asyncio.wait(self._background_tasks) - logger.debug("reader stream %s was closed", self._id) + logger.debug("%s was closed", self._log_prefix) diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py index 5e55917d830..ff9c59f7989 100644 --- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py @@ -833,7 +833,8 @@ class WriterAsyncIOStream: self._update_token_task.cancel() await asyncio.wait([self._update_token_task]) - self._stream.close() + if getattr(self, "_stream", None) is not None: + self._stream.close() logger.debug("writer stream %s was closed", self._id) @staticmethod @@ -845,15 +846,27 @@ class WriterAsyncIOStream: ) -> "WriterAsyncIOStream": stream = GrpcWrapperAsyncIO(StreamWriteMessage.FromServer.from_proto) - await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamWrite) + writer = None + try: + await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamWrite) - creds = driver._credentials - writer = WriterAsyncIOStream( - update_token_interval=update_token_interval, - get_token_function=creds.get_auth_token if creds else lambda: "", - tx_identity=tx_identity, - ) - await writer._start(stream, init_request) + creds = driver._credentials + writer = WriterAsyncIOStream( + update_token_interval=update_token_interval, + get_token_function=creds.get_auth_token if creds else lambda: "", + tx_identity=tx_identity, + ) + await writer._start(stream, init_request) + except BaseException: + # If create() fails after stream.start() (e.g. the connection is lost while + # waiting for the init response), the stream is not yet handed to the + # reconnector, so its finally cannot reach it. Close it here to avoid a + # stranded gRPC consumption thread that blocks forever on the request queue. + if writer is not None: + await writer.close() + else: + stream.close() + raise logger.debug( "writer stream %s started seqno=%s", writer._id, @@ -875,6 +888,10 @@ class WriterAsyncIOStream: raise Exception("Unknown message while read writer answers: %s" % item) async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMessage.InitRequest): + # Assign before the init handshake (mirrors ReaderStream._start) so that if _start + # fails mid-handshake, close() can still reach the stream and release its gRPC thread. + self._stream = stream + logger.debug("writer stream %s send init request", self._id) stream.write(StreamWriteMessage.FromClient(init_message)) @@ -895,8 +912,6 @@ class WriterAsyncIOStream: self.last_seqno, ) - self._stream = stream - if self._update_token_interval is not None: self._update_token_event.set() self._update_token_task = asyncio.create_task(self._update_token_loop()) diff --git a/contrib/python/ydb/py3/ydb/aio/connection.py b/contrib/python/ydb/py3/ydb/aio/connection.py index 3cb6290e1d4..02320bf9afc 100644 --- a/contrib/python/ydb/py3/ydb/aio/connection.py +++ b/contrib/python/ydb/py3/ydb/aio/connection.py @@ -26,6 +26,7 @@ from ydb.connection import ( from ydb.driver import DriverConfig from ydb.settings import BaseRequestSettings from ydb import issues +from ydb.opentelemetry.tracing import get_trace_metadata try: from ydb.public.api.grpc import ydb_topic_v1_pb2_grpc @@ -70,6 +71,9 @@ async def _construct_metadata( metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type)) metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ()))) + + metadata.extend(get_trace_metadata()) + return metadata @@ -148,6 +152,9 @@ class Connection: "closing", "endpoint_key", "node_id", + "peer_address", + "peer_port", + "peer_location", ) def __init__( @@ -156,10 +163,12 @@ class Connection: driver_config: Optional[DriverConfig] = None, endpoint_options: Optional[EndpointOptions] = None, ) -> None: - global _stubs_list self.endpoint = endpoint self.endpoint_key = EndpointKey(self.endpoint, getattr(endpoint_options, "node_id", None)) self.node_id = getattr(endpoint_options, "node_id", None) + self.peer_address = getattr(endpoint_options, "address", None) + self.peer_port = getattr(endpoint_options, "port", None) + self.peer_location = getattr(endpoint_options, "location", None) self._channel = channel_factory(self.endpoint, driver_config, grpc.aio, endpoint_options=endpoint_options) self._driver_config = driver_config @@ -248,8 +257,12 @@ class Connection: :param grace: :return: None """ - if hasattr(self, "_channel") and hasattr(self._channel, "close"): - await self._channel.close(grace) + channel = getattr(self, "_channel", None) + if channel is not None and hasattr(channel, "close"): + await channel.close(grace) + + self._stub_instances.clear() + self._channel = None def add_cleanup_callback(self, callback: Callable[["Connection"], None]) -> None: self._cleanup_callbacks.append(callback) diff --git a/contrib/python/ydb/py3/ydb/aio/driver.py b/contrib/python/ydb/py3/ydb/aio/driver.py index 405e5fcbf37..88221e947db 100644 --- a/contrib/python/ydb/py3/ydb/aio/driver.py +++ b/contrib/python/ydb/py3/ydb/aio/driver.py @@ -69,6 +69,7 @@ class Driver(pool.ConnectionPool): root_certificates, credentials, config_class=DriverConfig, + **kwargs, ) super(Driver, self).__init__(config) diff --git a/contrib/python/ydb/py3/ydb/aio/pool.py b/contrib/python/ydb/py3/ydb/aio/pool.py index fe7091332ce..4d952086c55 100644 --- a/contrib/python/ydb/py3/ydb/aio/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/pool.py @@ -6,6 +6,7 @@ import random from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING from ydb import issues +from ydb.opentelemetry.tracing import SpanName, create_ydb_span from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool from .connection import Connection, EndpointKey @@ -247,17 +248,36 @@ class ConnectionPool(IConnectionPool): self._store = ConnectionsCache(driver_config.use_all_nodes) self._grpc_init = Connection(self._driver_config.endpoint, self._driver_config) self._stopped = False + self._stopping = False self._discovery: Optional[Discovery] = None self._discovery_task: "asyncio.Task[None]" if driver_config.disable_discovery: # If discovery is disabled, just add the initial endpoint to the store async def init_connection() -> None: - ready_connection = Connection(self._driver_config.endpoint, self._driver_config) - await ready_connection.connection_ready( - ready_timeout=getattr(self._driver_config, "discovery_request_timeout", 10) - ) - self._store.add(ready_connection) + ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10) + while not self._stopping: + ready_connection = Connection(self._driver_config.endpoint, self._driver_config) + try: + await ready_connection.connection_ready(ready_timeout=ready_timeout) + except asyncio.CancelledError: + try: + await ready_connection.close() + except Exception: + logger.debug("Failed to close cancelled initial connection", exc_info=True) + raise + except Exception: + logger.debug("Initial connection attempt failed", exc_info=True) + try: + await ready_connection.close() + except Exception: + logger.debug("Failed to close unsuccessful initial connection", exc_info=True) + if not self._stopping: + await asyncio.sleep(1) + continue + + self._store.add(ready_connection) + return # Create and schedule the task to initialize the connection self._discovery_task = asyncio.get_event_loop().create_task(init_connection()) @@ -267,6 +287,7 @@ class ConnectionPool(IConnectionPool): self._discovery_task = asyncio.get_event_loop().create_task(self._discovery.run()) async def stop(self, timeout: int = 10) -> None: # type: ignore[override] # async override of sync method + self._stopping = True if self._discovery: self._discovery.stop() await self._grpc_init.close() @@ -274,6 +295,12 @@ class ConnectionPool(IConnectionPool): await asyncio.wait_for(self._discovery_task, timeout=timeout) except asyncio.TimeoutError: self._discovery_task.cancel() + try: + await self._discovery_task + except asyncio.CancelledError: + pass + if self._discovery is None: + await self._store.cleanup() self._stopped = True def _on_disconnected(self, connection: Connection) -> Callable[[], Any]: @@ -285,7 +312,8 @@ class ConnectionPool(IConnectionPool): return __wrapper__ async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method - await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0) + with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context(): + await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0) def discovery_debug_details(self) -> str: if self._discovery: diff --git a/contrib/python/ydb/py3/ydb/aio/query/base.py b/contrib/python/ydb/py3/ydb/aio/query/base.py index a1d6e5d74fc..6c13dfd400a 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/base.py +++ b/contrib/python/ydb/py3/ydb/aio/query/base.py @@ -2,9 +2,12 @@ from .. import _utilities class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): - def __init__(self, it, wrapper, on_error=None): + """Async ExecuteQuery result stream.""" + + def __init__(self, it, wrapper, on_error=None, on_finish=None): super().__init__(it, wrapper) self._on_error = on_error + self._on_finish = on_finish async def __aenter__(self) -> "AsyncResponseContextIterator": return self @@ -15,6 +18,7 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): except StopAsyncIteration: # Normal stream termination is not an error and must not invalidate # the session. + self._call_on_finish() raise except BaseException as e: # BaseException (not Exception) because asyncio.CancelledError @@ -25,8 +29,17 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): # reply with SessionBusy. if self._on_error: self._on_error(e) + self._call_on_finish(e) raise + def _call_on_finish(self, exception=None): + if self._on_finish is not None: + self._on_finish(exception) + self._on_finish = None + + def __del__(self): + self._call_on_finish() + async def __aexit__(self, exc_type, exc_val, exc_tb): # To close stream on YDB it is necessary to scroll through it to the end. # Errors that happen during the cleanup drain have already been reported @@ -39,3 +52,4 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): pass except BaseException: pass + self._call_on_finish() diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py index a565b266e82..b776b638268 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/session.py +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -19,6 +19,7 @@ from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public from ...query import base from ...query.session import BaseQuerySession +from ...opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT @@ -105,8 +106,10 @@ class QuerySession(BaseQuerySession["AsyncDriver"]): if self._closed: raise RuntimeError("Session is already closed") - await self._create_call(settings=settings) - await self._attach() + with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span: + await self._create_call(settings=settings) + set_peer_attributes(span, self._peer) + await self._attach() return self @@ -159,20 +162,27 @@ class QuerySession(BaseQuerySession["AsyncDriver"]): """ self._check_session_ready_to_use() - stream_it = await self._execute_call( - query=query, - parameters=parameters, - commit_tx=True, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, + span = create_ydb_span( + SpanName.EXECUTE_QUERY, + self._driver_config, + node_id=self._node_id, + peer=self._peer, ) + with span.attach_context(end_on_exit=False): + stream_it = await self._execute_call( + query=query, + parameters=parameters, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) return AsyncResponseContextIterator( it=stream_it, wrapper=lambda resp: base.wrap_execute_query_response( @@ -182,6 +192,7 @@ class QuerySession(BaseQuerySession["AsyncDriver"]): settings=self._settings, ), on_error=self._on_execute_stream_error, + on_finish=span_finish_callback(span), ) async def explain( diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py index c31d79fb088..a05d91f2349 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -12,6 +12,7 @@ from ...query.transaction import ( BaseQueryTxContext, QueryTxStateEnum, ) +from ...opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback if TYPE_CHECKING: from .session import QuerySession @@ -87,7 +88,13 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]): :return: None or exception if begin is failed """ - await self._begin_call(settings) + with create_ydb_span( + SpanName.BEGIN_TRANSACTION, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + await self._begin_call(settings) return self async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: @@ -109,13 +116,19 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]): await self._ensure_prev_stream_finished() - try: - await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) - await self._commit_call(settings) - await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None) - except BaseException as e: - await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e) - raise e + with create_ydb_span( + SpanName.COMMIT, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) + await self._commit_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e) + raise e async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -136,13 +149,19 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]): await self._ensure_prev_stream_finished() - try: - await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) - await self._rollback_call(settings) - await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None) - except BaseException as e: - await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e) - raise e + with create_ydb_span( + SpanName.ROLLBACK, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) + await self._rollback_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e async def execute( self, @@ -190,20 +209,27 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]): """ await self._ensure_prev_stream_finished() - stream_it = await self._execute_call( - query=query, - parameters=parameters, - commit_tx=commit_tx, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, + span = create_ydb_span( + SpanName.EXECUTE_QUERY, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), ) + with span.attach_context(end_on_exit=False): + stream_it = await self._execute_call( + query=query, + parameters=parameters, + commit_tx=commit_tx, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) self._prev_stream = AsyncResponseContextIterator( it=stream_it, wrapper=lambda resp: base.wrap_execute_query_response( @@ -215,5 +241,6 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]): settings=self.session._settings, ), on_error=self.session._on_execute_stream_error, + on_finish=span_finish_callback(span), ) return self._prev_stream diff --git a/contrib/python/ydb/py3/ydb/aio/table.py b/contrib/python/ydb/py3/ydb/aio/table.py index 0d14ba2f11b..8d5e02c180d 100644 --- a/contrib/python/ydb/py3/ydb/aio/table.py +++ b/contrib/python/ydb/py3/ydb/aio/table.py @@ -462,7 +462,8 @@ async def retry_operation(callee, retry_settings=None, *args, **kwargs): # pyli opt_generator = ydb.retry_operation_impl(callee, retry_settings, *args, **kwargs) for next_opt in opt_generator: if isinstance(next_opt, ydb.YdbRetryOperationSleepOpt): - await asyncio.sleep(next_opt.timeout) + if next_opt.timeout > 0: + await asyncio.sleep(next_opt.timeout) else: try: return await next_opt.result diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py index 98fbd5aab31..7e5ad3675fe 100644 --- a/contrib/python/ydb/py3/ydb/connection.py +++ b/contrib/python/ydb/py3/ydb/connection.py @@ -24,6 +24,7 @@ from google.protobuf import text_format import grpc from . import issues, _apis, _utilities from . import default_pem +from .opentelemetry.tracing import get_trace_metadata _stubs_list = ( _apis.TableService.Stub, @@ -179,6 +180,9 @@ def _construct_metadata(driver_config, settings): metadata.extend(getattr(settings, "headers", [])) metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ()))) + + metadata.extend(get_trace_metadata()) + return metadata @@ -194,11 +198,14 @@ def _get_request_timeout(settings): class EndpointOptions(object): - __slots__ = ("ssl_target_name_override", "node_id") + __slots__ = ("ssl_target_name_override", "node_id", "address", "port", "location") - def __init__(self, ssl_target_name_override=None, node_id=None): + def __init__(self, ssl_target_name_override=None, node_id=None, address=None, port=None, location=None): self.ssl_target_name_override = ssl_target_name_override self.node_id = node_id + self.address = address + self.port = port + self.location = location def _construct_channel_options(driver_config, endpoint_options=None): @@ -405,6 +412,9 @@ class Connection(object): "closing", "endpoint_key", "node_id", + "peer_address", + "peer_port", + "peer_location", ) def __init__( @@ -419,9 +429,11 @@ class Connection(object): discovered by the YDB endpoint discovery mechanism :param driver_config: A driver config instance to be used for RPC call interception """ - global _stubs_list self.endpoint = endpoint self.node_id = getattr(endpoint_options, "node_id", None) + self.peer_address = getattr(endpoint_options, "address", None) + self.peer_port = getattr(endpoint_options, "port", None) + self.peer_location = getattr(endpoint_options, "location", None) self.endpoint_key = EndpointKey(endpoint, getattr(endpoint_options, "node_id", None)) self._channel = channel_factory(self.endpoint, driver_config, endpoint_options=endpoint_options) self._driver_config = driver_config @@ -590,8 +602,12 @@ class Connection(object): self.destroy() def destroy(self): - if hasattr(self, "_channel") and hasattr(self._channel, "close"): - self._channel.close() + channel = getattr(self, "_channel", None) + if channel is not None and hasattr(channel, "close"): + channel.close() + + self._stub_instances.clear() + self._channel = None def ready_future(self): """ diff --git a/contrib/python/ydb/py3/ydb/convert.py b/contrib/python/ydb/py3/ydb/convert.py index b54e8bddc8d..89f28ecc597 100644 --- a/contrib/python/ydb/py3/ydb/convert.py +++ b/contrib/python/ydb/py3/ydb/convert.py @@ -24,11 +24,21 @@ _initialize() class _DotDict(dict): + # A lazy __dict__ is declared on purpose: it is not materialized until a row + # is written to (or its __dict__ is introspected), so untouched read-only + # rows avoid the per-instance dict overhead while callers can still attach + # their own attributes (ORM-style row.extra = ...), which materializes the + # dict for that row alone. + __slots__ = ("__dict__",) + def __init__(self, *args, **kwargs): super(_DotDict, self).__init__(*args, **kwargs) def __getattr__(self, item): - return self[item] + try: + return self[item] + except KeyError: + raise AttributeError(item) from None def _is_decimal_signed(hi_value): @@ -83,7 +93,7 @@ def _pb_to_dict(type_pb, value_pb, table_client_settings): class _Struct(_DotDict): - pass + __slots__ = () def _pb_to_struct(type_pb, value_pb, table_client_settings): @@ -351,6 +361,16 @@ def _unwrap_optionality(column): return _to_native_map.get(current_type), c_type +def _detach_columns(columns): + # The columns container references the source protobuf message, which keeps + # the whole arena (including already-parsed message.rows) alive for as long + # as the result set is held. Copy the schema into a standalone message so the + # source arena can be released right after conversion. + holder = _apis.ydb_value.ResultSet() + holder.columns.extend(columns) + return holder.columns + + class _ResultSet(object): __slots__ = ("columns", "rows", "truncated", "snapshot", "index", "format", "arrow_format_meta", "data") @@ -375,12 +395,17 @@ class _ResultSet(object): for column in message.columns: column_parsers.append(_unwrap_optionality(column)) + # Names are the only per-row metadata we need. Storing this shared tuple + # (instead of the proto columns) keeps rows detached from the source + # protobuf arena, so it can be freed once conversion is done. + column_names = tuple(column.name for column in message.columns) + for row_proto in message.rows: - row = _Row(message.columns) - for column, value, column_info in zip(message.columns, row_proto.items, column_parsers): + row = _Row(column_names) + for name, value, column_info in zip(column_names, row_proto.items, column_parsers): v_type = value.WhichOneof("value") if v_type == "null_flag_value": - row[column.name] = None + row[name] = None continue while v_type == "nested_value": @@ -388,7 +413,7 @@ class _ResultSet(object): v_type = value.WhichOneof("value") column_parser, unwrapped_type = column_info - row[column.name] = column_parser(unwrapped_type, value, table_client_settings) + row[name] = column_parser(unwrapped_type, value, table_client_settings) rows.append(row) from ydb.query import QueryResultSetFormat, ArrowFormatMeta @@ -401,12 +426,17 @@ class _ResultSet(object): data = message.data if message.data else None - return cls(message.columns, rows, message.truncated, snapshot, index, result_format, arrow_meta, data) + return cls( + _detach_columns(message.columns), rows, message.truncated, snapshot, index, result_format, arrow_meta, data + ) @classmethod def lazy_from_message(cls, message, table_client_settings=None, snapshot=None): from ydb.query import QueryResultSetFormat, ArrowFormatMeta + # No _detach_columns here on purpose: _LazyRows defers parsing and keeps + # message.rows, so the source arena is pinned regardless — detaching the + # schema would only add a copy without freeing anything. rows = _LazyRows(message.rows, table_client_settings, message.columns) result_format = message.format if message.format else QueryResultSetFormat.VALUE @@ -422,15 +452,17 @@ ResultSet = _ResultSet class _Row(_DotDict): + __slots__ = ("_columns",) + def __init__(self, columns): super(_Row, self).__init__() self._columns = columns def __getitem__(self, key): if isinstance(key, int): - return self[self._columns[key].name] + return self[self._columns[key]] elif isinstance(key, slice): - return tuple(map(lambda x: self[x.name], self._columns[key])) + return tuple(self[name] for name in self._columns[key]) else: return super(_Row, self).__getitem__(key) @@ -455,10 +487,11 @@ class _LazyRowItem: class _LazyRow(_DotDict): + __slots__ = ("_columns",) + def __init__(self, columns, proto_row, table_client_settings, parsers): super(_LazyRow, self).__init__() self._columns = columns - self._table_client_settings = table_client_settings for i, (column, row_item) in enumerate(zip(self._columns, proto_row.items)): super(_LazyRow, self).__setitem__( column.name, diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py b/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py new file mode 100644 index 00000000000..fc058d0d866 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py @@ -0,0 +1,36 @@ +"""Public OpenTelemetry entrypoints for YDB.""" + + +def enable_tracing(tracer=None): + """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls. + + This call is **idempotent**: if tracing is already enabled, later calls do nothing + (including passing a different ``tracer``). Call :func:`disable_tracing` first to + reconfigure or turn instrumentation off. + + Args: + tracer: Optional OTel tracer to use. If not provided, the default tracer named + ``ydb.sdk`` from the global tracer provider will be used. + """ + try: + from ydb.opentelemetry.plugin import _enable_tracing + except ImportError: + raise ImportError( + "OpenTelemetry packages are required for tracing support. " + "Install them with: pip install ydb[opentelemetry]" + ) from None + + _enable_tracing(tracer) + + +def disable_tracing(): + """Disable YDB OpenTelemetry hooks and allow :func:`enable_tracing` to run again.""" + try: + from ydb.opentelemetry.plugin import _disable_tracing + except ImportError: + return + + _disable_tracing() + + +__all__ = ["disable_tracing", "enable_tracing"] diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py b/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py new file mode 100644 index 00000000000..76942789f52 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py @@ -0,0 +1,134 @@ +"""OpenTelemetry bridge for YDB.""" + +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.propagate import inject +from opentelemetry.trace import StatusCode + +from ydb import issues +from ydb.issues import StatusCode as YdbStatusCode +from ydb.opentelemetry.tracing import _registry + +# YDB client transport StatusCode values (401xxx band) -> OTel error.type transport_error. +_TRANSPORT_STATUSES = frozenset( + { + YdbStatusCode.CONNECTION_LOST, + YdbStatusCode.CONNECTION_FAILURE, + YdbStatusCode.DEADLINE_EXCEEDED, + YdbStatusCode.CLIENT_INTERNAL_ERROR, + YdbStatusCode.UNIMPLEMENTED, + } +) + +_tracer = None +_enabled = False + +_KIND_MAP = { + "client": trace.SpanKind.CLIENT, + "internal": trace.SpanKind.INTERNAL, +} + + +def _otel_metadata_hook(): + """Inject W3C Trace Context into outgoing gRPC metadata using the active OTel context.""" + headers = {} + inject(headers) + return list(headers.items()) + + +def _set_error_on_span(span, exception): + if isinstance(exception, issues.Error) and exception.status is not None: + span.set_attribute("db.response.status_code", exception.status.name) + error_type = "transport_error" if exception.status in _TRANSPORT_STATUSES else "ydb_error" + else: + error_type = type(exception).__qualname__ + + span.set_attribute("error.type", error_type) + span.set_status(StatusCode.ERROR, str(exception)) + span.record_exception(exception) + + +class _AttachContext: + """Make a span the active OTel context for a ``with`` block. + + When ``end_on_exit=True`` (default) the span is ended on exit — used for + single-shot RPCs. When ``end_on_exit=False`` the span is only ended on + exception — used for streaming RPCs where the result iterator owns ``end()``. + """ + + def __init__(self, span, end_on_exit): + self._span = span + self._end_on_exit = end_on_exit + self._token = None + + def __enter__(self): + ctx = trace.set_span_in_context(self._span._span) + self._token = otel_context.attach(ctx) + return self._span + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._token is not None: + otel_context.detach(self._token) + self._token = None + if exc_val is not None: + self._span.set_error(exc_val) + self._span.end() + elif self._end_on_exit: + self._span.end() + return False + + +class TracingSpan: + """Wrapper around an OTel span. + + Use :meth:`attach_context` as a context manager around any RPC call. + The default (``end_on_exit=True``) is for single-shot operations; pass + ``end_on_exit=False`` for streaming RPCs where the result iterator owns + ``end()``. + """ + + def __init__(self, span): + self._span = span + + def set_error(self, exception): + _set_error_on_span(self._span, exception) + + def set_attribute(self, key, value): + self._span.set_attribute(key, value) + + def end(self): + self._span.end() + + def attach_context(self, end_on_exit=True): + return _AttachContext(self, end_on_exit) + + +def _create_span(name, attributes=None, kind=None): + span = _tracer.start_span( + name, + kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT), + attributes=attributes or {}, + ) + return TracingSpan(span) + + +def _enable_tracing(tracer=None): + global _enabled, _tracer + + if _enabled: + return + + _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk") + _enabled = True + _registry.set_metadata_hook(_otel_metadata_hook) + _registry.set_create_span(_create_span) + + +def _disable_tracing(): + """Clear hooks and tracer; after this, :func:`~ydb.opentelemetry.enable_tracing` may be called again.""" + global _enabled, _tracer + + _registry.set_create_span(None) + _registry.set_metadata_hook(None) + _enabled = False + _tracer = None diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py b/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py new file mode 100644 index 00000000000..1d0995df8d5 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py @@ -0,0 +1,165 @@ +"""Internal SDK tracing helpers and registry.""" + +import enum +from typing import Optional, Tuple + + +class SpanName(str, enum.Enum): + """Canonical span names used across the YDB SDK.""" + + CREATE_SESSION = "ydb.CreateSession" + EXECUTE_QUERY = "ydb.ExecuteQuery" + BEGIN_TRANSACTION = "ydb.BeginTransaction" + COMMIT = "ydb.Commit" + ROLLBACK = "ydb.Rollback" + DRIVER_INITIALIZE = "ydb.Driver.Initialize" + RUN_WITH_RETRY = "ydb.RunWithRetry" + TRY = "ydb.Try" + + +class _NoopCtx: + __slots__ = ("_span",) + + def __init__(self, span): + self._span = span + + def __enter__(self): + return self._span + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + +class _NoopSpan: + """Returned by create_ydb_span when tracing is disabled.""" + + def set_error(self, exception): + pass + + def set_attribute(self, key, value): + pass + + def end(self): + pass + + def attach_context(self, end_on_exit=True): + return _NoopCtx(self) + + +_NOOP_SPAN = _NoopSpan() + + +class OtelTracingRegistry: + """Singleton registry for OpenTelemetry tracing. + + By default everything is no-op until :func:`~ydb.opentelemetry.enable_tracing` is called. + """ + + def __init__(self): + self._metadata_hook = None + self._create_span_func = None + + def is_active(self) -> bool: + return self._create_span_func is not None + + def create_span(self, name, attributes=None, kind=None): + if self._create_span_func is None: + return _NOOP_SPAN + return self._create_span_func(name, attributes, kind=kind) + + def get_trace_metadata(self): + if self._metadata_hook is not None: + return self._metadata_hook() + return [] + + def set_metadata_hook(self, hook): + self._metadata_hook = hook + + def set_create_span(self, func): + self._create_span_func = func + + +_registry = OtelTracingRegistry() + + +def get_trace_metadata(): + """Return tracing metadata for gRPC calls.""" + return _registry.get_trace_metadata() + + +def _split_endpoint(endpoint: Optional[str]) -> Tuple[str, int]: + ep = endpoint or "" + if ep.startswith("grpcs://"): + ep = ep[len("grpcs://") :] + elif ep.startswith("grpc://"): + ep = ep[len("grpc://") :] + + if ep.startswith("["): + close = ep.find("]") + if close != -1 and len(ep) > close + 1 and ep[close + 1] == ":": + host = ep[: close + 1] + port_s = ep[close + 2 :] + return host, int(port_s) if port_s.isdigit() else 0 + + host, sep, port_s = ep.rpartition(":") + if not sep: + return ep, 0 + return host, int(port_s) if port_s.isdigit() else 0 + + +def _build_ydb_attrs(driver_config, node_id=None, peer=None): + host, port = _split_endpoint(getattr(driver_config, "endpoint", None)) + attrs = { + "db.system.name": "ydb", + "db.namespace": getattr(driver_config, "database", None) or "", + "server.address": host, + "server.port": port, + } + if peer is not None: + address, port_, location = peer + if address is not None: + attrs["network.peer.address"] = address + if port_ is not None: + attrs["network.peer.port"] = int(port_) + if location: + attrs["ydb.node.dc"] = location + if node_id is not None: + attrs["ydb.node.id"] = node_id + return attrs + + +def create_span(name, attributes=None, kind="internal"): + """Create a span with no YDB-specific attributes (used for SDK-internal operations).""" + return _registry.create_span(name, attributes=attributes, kind=kind).attach_context() + + +def create_ydb_span(name, driver_config, node_id=None, kind=None, peer=None): + """Create a span pre-filled with standard YDB attributes.""" + if not _registry.is_active(): + return _NOOP_SPAN + attrs = _build_ydb_attrs(driver_config, node_id, peer) + return _registry.create_span(name, attributes=attrs, kind=kind) + + +def set_peer_attributes(span, peer): + """Fill in network.peer.* and ydb.node.dc on an existing span once the peer is known.""" + if peer is None: + return + address, port, location = peer + if address is not None: + span.set_attribute("network.peer.address", address) + if port is not None: + span.set_attribute("network.peer.port", int(port)) + if location: + span.set_attribute("ydb.node.dc", location) + + +def span_finish_callback(span): + """Return an on_finish callable that ends *span* when a streaming result iterator completes.""" + + def _finish(exception=None): + if exception is not None: + span.set_error(exception) + span.end() + + return _finish diff --git a/contrib/python/ydb/py3/ydb/pool.py b/contrib/python/ydb/py3/ydb/pool.py index 2901c573435..f1a4bdf940b 100644 --- a/contrib/python/ydb/py3/ydb/pool.py +++ b/contrib/python/ydb/py3/ydb/pool.py @@ -10,6 +10,7 @@ import random from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING from . import connection as connection_impl, issues, resolver, _utilities, tracing +from .opentelemetry.tracing import SpanName, create_ydb_span from abc import abstractmethod from .connection import Connection, EndpointKey @@ -400,23 +401,41 @@ class ConnectionPool(IConnectionPool): self._store = ConnectionsCache(driver_config.use_all_nodes, driver_config.tracer) self.tracer = driver_config.tracer self._grpc_init = connection_impl.Connection(self._driver_config.endpoint, self._driver_config) + self._stopped = False + self._stop_guard = threading.Lock() + self._stop_event = threading.Event() + self._init_thread: Optional[threading.Thread] = None if driver_config.disable_discovery: - # If discovery is disabled, just add the initial endpoint to the store - ready_connection = connection_impl.Connection.ready_factory( - self._driver_config.endpoint, - self._driver_config, - ready_timeout=getattr(self._driver_config, "discovery_request_timeout", 10), - ) - self._store.add(ready_connection) + # If discovery is disabled, establish the initial connection in a + # background thread, retrying until it succeeds or the pool is stopped. + # Doing this off the constructor keeps wait(timeout) as the blocking + # point and lets stop() interrupt the retry loop. self._discovery_thread = None + self._init_thread = threading.Thread( + name="ydb_driver_initial_connection", + target=self._init_connection, + daemon=True, + ) + self._init_thread.start() else: # Start discovery thread as usual self._discovery_thread = Discovery(self._store, self._driver_config) self._discovery_thread.start() - self._stopped = False - self._stop_guard = threading.Lock() + def _init_connection(self) -> None: + ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10) + while not self._stopped: + ready_connection = connection_impl.Connection.ready_factory( + self._driver_config.endpoint, + self._driver_config, + ready_timeout=ready_timeout, + ) + if self._store.add(ready_connection): + return + + logger.debug("Initial connection attempt failed") + self._stop_event.wait(1) def stop(self, timeout: int = 10) -> None: """ @@ -430,11 +449,16 @@ class ConnectionPool(IConnectionPool): return self._stopped = True + self._stop_event.set() if self._discovery_thread: self._discovery_thread.stop() self._grpc_init.close() if self._discovery_thread: self._discovery_thread.join(timeout) + if self._init_thread: + self._init_thread.join(timeout) + if self._discovery_thread is None: + self._store.cleanup() def async_wait(self, fail_fast: bool = False) -> "futures.Future[None]": """ @@ -453,10 +477,11 @@ class ConnectionPool(IConnectionPool): :param timeout: A timeout to wait in seconds :return: None """ - if fail_fast: - self._store.add_fast_fail().result(timeout) - else: - self._store.subscribe().result(timeout) + with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context(): + if fail_fast: + self._store.add_fast_fail().result(timeout) + else: + self._store.subscribe().result(timeout) def _on_disconnected(self, connection: Connection) -> None: """ diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index 7fea6cd0e8c..093c7d55452 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -27,7 +27,6 @@ from .. import _apis from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop from ydb._grpc.grpcwrapper.common_utils import to_thread - if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext from .session import BaseQuerySession @@ -73,9 +72,12 @@ class QueryResultSetFormat(enum.IntEnum): class SyncResponseContextIterator(_utilities.SyncResponseIterator): - def __init__(self, it, wrapper, on_error=None): + """Streams ExecuteQuery results.""" + + def __init__(self, it, wrapper, on_error=None, on_finish=None): super().__init__(it, wrapper) self._on_error = on_error + self._on_finish = on_finish def __enter__(self) -> "SyncResponseContextIterator": return self @@ -86,6 +88,7 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator): except StopIteration: # Normal stream termination is not an error and must not invalidate # the session. + self._call_on_finish() raise except BaseException as e: # BaseException (not Exception) for parity with the async iterator: @@ -95,8 +98,17 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator): # SessionBusy. if self._on_error: self._on_error(e) + self._call_on_finish(e) raise + def _call_on_finish(self, exception=None): + if self._on_finish is not None: + self._on_finish(exception) + self._on_finish = None + + def __del__(self): + self._call_on_finish() + def __exit__(self, exc_type, exc_val, exc_tb): # To close stream on YDB it is necessary to scroll through it to the end. # Errors during the cleanup drain have already been reported to _on_error @@ -107,6 +119,7 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator): pass except BaseException: pass + self._call_on_finish() class QueryClientSettings: diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index f2099f8cdbf..a9c1b4a50d9 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -18,6 +18,7 @@ from . import base from .base import QueryExplainResultFormat from .. import _apis, issues, _utilities +from ..opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback from ..settings import BaseRequestSettings from ..connection import _RpcState as RpcState, EndpointKey from .._grpc.grpcwrapper import common_utils @@ -30,7 +31,7 @@ from .transaction import QueryTxContext from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT if TYPE_CHECKING: - from ..driver import Driver as SyncDriver + from ..driver import Driver as SyncDriver, DriverConfig from ..aio.driver import Driver as AsyncDriver @@ -46,9 +47,30 @@ def wrapper_create_session( issues._process_response(message.status) session._session_id = message.session_id session._node_id = message.node_id + session._peer = _resolve_peer(session._driver, message.node_id) return session +def _resolve_peer(driver, node_id): + """Look up network.peer.* / ydb.node.dc for a node in the driver's endpoint map.""" + if node_id is None: + return None + store = getattr(driver, "_store", None) + if store is None: + return None + by_node = getattr(store, "connections_by_node_id", None) + if not by_node: + return None + connection = by_node.get(node_id) + if connection is None: + return None + return ( + getattr(connection, "peer_address", None), + getattr(connection, "peer_port", None), + getattr(connection, "peer_location", None), + ) + + def wrapper_delete_session( rpc_state: RpcState, response_pb: _apis.ydb_query.DeleteSessionResponse, @@ -69,6 +91,7 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]): # Session data _session_id: Optional[str] = None _node_id: Optional[int] = None + _peer: Optional[tuple] = None _closed: bool = False _invalidated: bool = False @@ -84,6 +107,10 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]): self._last_query_stats = None + @property + def _driver_config(self) -> Optional["DriverConfig"]: + return getattr(self._driver, "_driver_config", None) + @property def session_id(self) -> Optional[str]: return self._session_id @@ -391,8 +418,10 @@ class QuerySession(BaseQuerySession["SyncDriver"]): if self._closed: raise RuntimeError("Session is already closed.") - self._create_call(settings=settings) - self._attach() + with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span: + self._create_call(settings=settings) + set_peer_attributes(span, self._peer) + self._attach() return self @@ -458,20 +487,27 @@ class QuerySession(BaseQuerySession["SyncDriver"]): """ self._check_session_ready_to_use() - stream_it = self._execute_call( - query=query, - parameters=parameters, - commit_tx=True, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, + span = create_ydb_span( + SpanName.EXECUTE_QUERY, + self._driver_config, + node_id=self._node_id, + peer=self._peer, ) + with span.attach_context(end_on_exit=False): + stream_it = self._execute_call( + query=query, + parameters=parameters, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) return base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -481,6 +517,7 @@ class QuerySession(BaseQuerySession["SyncDriver"]): settings=self._settings, ), on_error=self._on_execute_stream_error, + on_finish=span_finish_callback(span), ) def explain( diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index fdcefb0b801..1d278ac2fee 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -17,6 +17,7 @@ from .. import ( _apis, issues, ) +from ..opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback from .._grpc.grpcwrapper import ydb_topic as _ydb_topic from .._grpc.grpcwrapper import ydb_query as _ydb_query from ..connection import _RpcState as RpcState @@ -244,6 +245,10 @@ class BaseQueryTxContext(base.CallbackHandler, Generic[DriverT]): self._external_error = None self._last_query_stats = None + @property + def _driver_config(self): + return getattr(self._driver, "_driver_config", None) + @property def session_id(self) -> Optional[str]: """ @@ -523,7 +528,13 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]): :return: Transaction object or exception if begin is failed """ - self._begin_call(settings) + with create_ydb_span( + SpanName.BEGIN_TRANSACTION, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + self._begin_call(settings) return self @@ -545,13 +556,19 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]): self._ensure_prev_stream_finished() - try: - self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) - self._commit_call(settings) - self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None) - except BaseException as e: # TODO: probably should be less wide - self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e) - raise e + with create_ydb_span( + SpanName.COMMIT, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) + self._commit_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e) + raise e def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -571,13 +588,19 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]): self._ensure_prev_stream_finished() - try: - self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) - self._rollback_call(settings) - self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None) - except BaseException as e: # TODO: probably should be less wide - self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e) - raise e + with create_ydb_span( + SpanName.ROLLBACK, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), + ).attach_context(): + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) + self._rollback_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e def execute( self, @@ -626,20 +649,27 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]): """ self._ensure_prev_stream_finished() - stream_it = self._execute_call( - query=query, - commit_tx=commit_tx, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - parameters=parameters, - concurrent_result_sets=concurrent_result_sets, - settings=settings, + span = create_ydb_span( + SpanName.EXECUTE_QUERY, + self._driver_config, + node_id=self.session.node_id, + peer=getattr(self.session, "_peer", None), ) + with span.attach_context(end_on_exit=False): + stream_it = self._execute_call( + query=query, + commit_tx=commit_tx, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -651,5 +681,6 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]): settings=self.session._settings, ), on_error=self.session._on_execute_stream_error, + on_finish=span_finish_callback(span), ) return self._prev_stream diff --git a/contrib/python/ydb/py3/ydb/resolver.py b/contrib/python/ydb/py3/ydb/resolver.py index e9e8e6f93f3..1402e542abc 100644 --- a/contrib/python/ydb/py3/ydb/resolver.py +++ b/contrib/python/ydb/py3/ydb/resolver.py @@ -53,7 +53,11 @@ class EndpointInfo(object): ssl_target_name_override = self.address endpoint_options = conn_impl.EndpointOptions( - ssl_target_name_override=ssl_target_name_override, node_id=self.node_id + ssl_target_name_override=ssl_target_name_override, + node_id=self.node_id, + address=self.address, + port=self.port, + location=self.location, ) if self.ipv6_addrs or self.ipv4_addrs: diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py index c151e3d2146..4b7c137f39e 100644 --- a/contrib/python/ydb/py3/ydb/retries.py +++ b/contrib/python/ydb/py3/ydb/retries.py @@ -7,6 +7,11 @@ from typing import Any, Callable, Generator, Optional, Union from . import issues from ._errors import check_retriable_error +from .opentelemetry.tracing import SpanName, create_span as _create_span + + +def _try_span_attrs(backoff_ms: Optional[int]): + return {"ydb.retry.backoff_ms": backoff_ms} if backoff_ms is not None else None class BackoffSettings: @@ -129,19 +134,18 @@ def retry_operation_impl( if not retriable_info.is_retriable: raise - skip_yield_error_types = [ + skip_yield_error_types = ( issues.Aborted, issues.BadSession, issues.NotFound, issues.InternalError, - ] - - yield_sleep = True - for t in skip_yield_error_types: - if isinstance(e, t): - yield_sleep = False + ) - if yield_sleep: + if isinstance(e, skip_yield_error_types): + # Skip the inter-attempt sleep but still emit a marker so consumers + # advance per-attempt bookkeeping (e.g. ``ydb.Try`` spans get backoff=0). + yield YdbRetryOperationSleepOpt(0.0) + else: yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) except Exception as e: @@ -159,12 +163,21 @@ def retry_operation_sync( *args: Any, **kwargs: Any, ) -> Any: - opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) - for next_opt in opt_generator: - if isinstance(next_opt, YdbRetryOperationSleepOpt): - time.sleep(next_opt.timeout) - else: - return next_opt.result + backoff_ms: Optional[int] = None + + @functools.wraps(callee) + def traced_callee(*a: Any, **kw: Any) -> Any: + with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)): + return callee(*a, **kw) + + with _create_span(SpanName.RUN_WITH_RETRY): + for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs): + if isinstance(next_opt, YdbRetryOperationSleepOpt): + backoff_ms = int(next_opt.timeout * 1000) + if next_opt.timeout > 0: + time.sleep(next_opt.timeout) + else: + return next_opt.result return None @@ -186,15 +199,20 @@ async def retry_operation_async( # pylint: disable=W1113 Returns awaitable result of coroutine. If retries are not succussful exception is raised. """ - opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) - for next_opt in opt_generator: - if isinstance(next_opt, YdbRetryOperationSleepOpt): - await asyncio.sleep(next_opt.timeout) - else: - try: - return await next_opt.result - except BaseException as e: # pylint: disable=W0703 - next_opt.set_exception(e) + backoff_ms: Optional[int] = None + with _create_span(SpanName.RUN_WITH_RETRY): + for next_opt in retry_operation_impl(callee, retry_settings, *args, **kwargs): + if isinstance(next_opt, YdbRetryOperationSleepOpt): + backoff_ms = int(next_opt.timeout * 1000) + if next_opt.timeout > 0: + await asyncio.sleep(next_opt.timeout) + else: + with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)) as try_span: + try: + return await next_opt.result + except BaseException as e: # pylint: disable=W0703 + try_span.set_error(e) + next_opt.set_exception(e) return None diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index d7d1d1869be..989938e1be0 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.28.4" +VERSION = "3.29.6" -- cgit v1.3