summaryrefslogtreecommitdiffstats
path: root/contrib/python
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2026-07-01 11:17:19 +0300
committerrobot-piglet <[email protected]>2026-07-01 12:39:51 +0300
commit2135a97d85fc48400cc3d0942311be98bc9148da (patch)
tree81674cdb76e867ff8d891d8ab046bc0622397a97 /contrib/python
parentf26b401bceaf9d119e31b45568c41188def5176c (diff)
Intermediate changes
commit_hash:f46c06c29696f6cfd01e306372ca9b430bd6b9e3
Diffstat (limited to 'contrib/python')
-rw-r--r--contrib/python/ydb/py3/.dist-info/METADATA5
-rw-r--r--contrib/python/ydb/py3/README.md1
-rw-r--r--contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch5
-rw-r--r--contrib/python/ydb/py3/ya.make6
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py131
-rw-r--r--contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py37
-rw-r--r--contrib/python/ydb/py3/ydb/aio/connection.py19
-rw-r--r--contrib/python/ydb/py3/ydb/aio/driver.py1
-rw-r--r--contrib/python/ydb/py3/ydb/aio/pool.py40
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/base.py16
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/session.py39
-rw-r--r--contrib/python/ydb/py3/ydb/aio/query/transaction.py81
-rw-r--r--contrib/python/ydb/py3/ydb/aio/table.py3
-rw-r--r--contrib/python/ydb/py3/ydb/connection.py26
-rw-r--r--contrib/python/ydb/py3/ydb/convert.py53
-rw-r--r--contrib/python/ydb/py3/ydb/opentelemetry/__init__.py36
-rw-r--r--contrib/python/ydb/py3/ydb/opentelemetry/plugin.py134
-rw-r--r--contrib/python/ydb/py3/ydb/opentelemetry/tracing.py165
-rw-r--r--contrib/python/ydb/py3/ydb/pool.py51
-rw-r--r--contrib/python/ydb/py3/ydb/query/base.py17
-rw-r--r--contrib/python/ydb/py3/ydb/query/session.py67
-rw-r--r--contrib/python/ydb/py3/ydb/query/transaction.py85
-rw-r--r--contrib/python/ydb/py3/ydb/resolver.py6
-rw-r--r--contrib/python/ydb/py3/ydb/retries.py64
-rw-r--r--contrib/python/ydb/py3/ydb/ydb_version.py2
25 files changed, 894 insertions, 196 deletions
diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA
index 3c0337ed5e0..809298530a7 100644
--- a/contrib/python/ydb/py3/.dist-info/METADATA
+++ b/contrib/python/ydb/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
-Version: 3.28.4
+Version: 3.29.6
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
@@ -21,6 +21,8 @@ Requires-Dist: grpcio >=1.42.0
Requires-Dist: packaging
Requires-Dist: protobuf <7.0.0,>=3.13.0
Requires-Dist: aiohttp <4
+Provides-Extra: opentelemetry
+Requires-Dist: opentelemetry-api >=1.0.0 ; extra == 'opentelemetry'
Provides-Extra: yc
Requires-Dist: yandexcloud ; extra == 'yc'
@@ -31,6 +33,7 @@ YDB Python SDK
[![API Reference](https://img.shields.io/badge/API-Reference-lightgreen.svg)](https://ydb-platform.github.io/ydb-python-sdk)
[![Functional tests](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml)
[![Style checks](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml)
+[![codecov](https://codecov.io/gh/ydb-platform/ydb-python-sdk/graph/badge.svg)](https://codecov.io/gh/ydb-platform/ydb-python-sdk)
Officially supported Python client for YDB.
diff --git a/contrib/python/ydb/py3/README.md b/contrib/python/ydb/py3/README.md
index 7842bd1d910..eeda8db3b42 100644
--- a/contrib/python/ydb/py3/README.md
+++ b/contrib/python/ydb/py3/README.md
@@ -5,6 +5,7 @@ YDB Python SDK
[![API Reference](https://img.shields.io/badge/API-Reference-lightgreen.svg)](https://ydb-platform.github.io/ydb-python-sdk)
[![Functional tests](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/tests.yaml)
[![Style checks](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml/badge.svg)](https://github.com/ydb-platform/ydb-python-sdk/actions/workflows/style.yaml)
+[![codecov](https://codecov.io/gh/ydb-platform/ydb-python-sdk/graph/badge.svg)](https://codecov.io/gh/ydb-platform/ydb-python-sdk)
Officially supported Python client for YDB.
diff --git a/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch
index a865f95a382..156c6d7799e 100644
--- a/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch
+++ b/contrib/python/ydb/py3/patches/03-change-inport-protobufs.patch
@@ -87,10 +87,11 @@
from ..common.protos import ydb_topic_pb2
--- contrib/python/ydb/py3/ydb/aio/connection.py (index)
+++ contrib/python/ydb/py3/ydb/aio/connection.py (working tree)
-@@ -27,11 +27,10 @@ from ydb.driver import DriverConfig
+@@ -28,12 +28,11 @@ from ydb import issues
from ydb.settings import BaseRequestSettings
from ydb import issues
-
+ from ydb.opentelemetry.tracing import get_trace_metadata
+
-# Workaround for good IDE and universal for runtime
-if TYPE_CHECKING:
- from ydb._grpc.v4 import ydb_topic_v1_pb2_grpc
diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make
index b75adf12a55..5c39099e85f 100644
--- a/contrib/python/ydb/py3/ya.make
+++ b/contrib/python/ydb/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(3.28.4)
+VERSION(3.29.6)
LICENSE(Apache-2.0)
@@ -16,6 +16,7 @@ PEERDIR(
NO_LINT()
NO_CHECK_IMPORTS(
+ ydb.opentelemetry.plugin
ydb.public.api.grpc
ydb.public.api.grpc.*
)
@@ -104,6 +105,9 @@ PY_SRCS(
ydb/oauth2_token_exchange/__init__.py
ydb/oauth2_token_exchange/token_exchange.py
ydb/oauth2_token_exchange/token_source.py
+ ydb/opentelemetry/__init__.py
+ ydb/opentelemetry/plugin.py
+ ydb/opentelemetry/tracing.py
ydb/operation.py
ydb/pool.py
ydb/query/__init__.py
diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
index f4e5a4f88a5..ee988a1ec3b 100644
--- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py
@@ -217,6 +217,7 @@ class ReaderReconnector:
_stream_reader: Optional["ReaderStream"]
_first_error: asyncio.Future[YdbError]
_tx_to_batches_map: Dict[str, typing.List[datatypes.PublicBatch]]
+ _closed: bool
def __init__(
self,
@@ -233,6 +234,7 @@ class ReaderReconnector:
self._state_changed = asyncio.Event()
self._stream_reader = None
+ self._closed = False
self._background_tasks.add(asyncio.create_task(self._connection_loop()))
self._first_error = asyncio.get_running_loop().create_future()
@@ -241,6 +243,8 @@ class ReaderReconnector:
async def _connection_loop(self):
attempt = 0
while True:
+ if self._closed:
+ return
try:
logger.debug("reader %s connect attempt %s", self._id, attempt)
self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings)
@@ -266,8 +270,12 @@ class ReaderReconnector:
# noinspection PyBroadException
try:
await self._stream_reader.close(flush=False)
- except BaseException:
- # supress any error on close stream reader
+ except asyncio.CancelledError:
+ # propagate cancellation (e.g. from reader.close()) so the loop stops
+ # instead of swallowing it and reconnecting into a zombie stream
+ raise
+ except Exception:
+ # suppress any error on close stream reader
pass
async def wait_message(self):
@@ -323,14 +331,44 @@ class ReaderReconnector:
tx._add_callback(TxEvent.AFTER_COMMIT, self._handle_after_tx_commit, self._loop)
tx._add_callback(TxEvent.AFTER_ROLLBACK, self._handle_after_tx_rollback, self._loop)
+ def _batch_partition_session_expired(self, batch: datatypes.PublicBatch) -> bool:
+ # A batch is expired if the reader reconnected after it was received: its partition
+ # session no longer belongs to the current stream. Mirrors the guard in
+ # ReaderStream.commit() for the non-transactional commit path.
+ stream = self._stream_reader
+ partition_session = batch._partition_session
+ return (
+ stream is None
+ or partition_session.reader_stream_id != stream._id
+ or partition_session.id not in stream._partition_sessions
+ )
+
async def _commit_batches_with_tx(self, tx: "BaseQueryTxContext"):
tx_id = tx.tx_id
if tx_id is None:
raise TopicReaderError("Transaction ID is None")
+
+ batches = self._tx_to_batches_map[tx_id]
+
+ if any(self._batch_partition_session_expired(batch) for batch in batches):
+ # The reader reconnected between receive_batch_with_tx() and tx.commit(), so
+ # these offsets belong to a partition session that no longer exists. Committing
+ # them would send a stale/gapped range (server "Gap", issue_code 2011) while the
+ # client believes the commit succeeded. Fail the tx instead (retriable) without
+ # sending the request; the AFTER_COMMIT handler then reconnects to reset the
+ # read-ahead state, and the pool re-reads from the committed offset.
+ err = issues.ClientInternalError(
+ "Topic reader partition session expired before tx commit; "
+ "offsets were not committed, the transaction will be retried"
+ )
+ tx._set_external_error(err)
+ del self._tx_to_batches_map[tx_id]
+ return
+
grouped_batches: Dict[str, Dict[int, typing.List[datatypes.PublicBatch]]] = defaultdict(
lambda: defaultdict(list)
)
- for batch in self._tx_to_batches_map[tx_id]:
+ for batch in batches:
grouped_batches[batch._partition_session.topic_path][batch._partition_session.partition_id].append(batch)
consumer = self._settings.consumer
@@ -401,8 +439,16 @@ class ReaderReconnector:
async def close(self, flush: bool):
logger.debug("reader reconnector %s close", self._id)
+ # Mark closed so the connection loop won't start a new stream, then close the
+ # current stream with the requested flush before cancelling the loop. On a normal
+ # close this flushes pending commits; cancelling the loop first would let it close
+ # the stream with flush=False instead and skip the flush.
+ self._closed = True
if self._stream_reader:
await self._stream_reader.close(flush)
+ # Wake any pending wait_message() waiter (e.g. a concurrent receive) so it doesn't
+ # hang if the loop was reconnecting when close() cancelled it.
+ self._set_first_error(TopicReaderStreamClosedError())
for task in self._background_tasks:
task.cancel()
@@ -469,6 +515,11 @@ class ReaderStream:
self._id = ReaderStream._static_id_counter.inc_and_get()
self._reader_reconnector_id = reader_reconnector_id
self._session_id = "not initialized"
+ self._log_prefix = "reader %s stream %s session=%s" % (
+ self._reader_reconnector_id,
+ self._id,
+ self._session_id,
+ )
self._stream = None
self._started = False
self._background_tasks = set()
@@ -503,17 +554,28 @@ class ReaderStream:
settings: topic_reader.PublicReaderSettings,
) -> "ReaderStream":
stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto)
+ reader = None
+ try:
+ await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead)
- await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead)
-
- creds = driver._credentials
- reader = ReaderStream(
- reader_reconnector_id,
- settings,
- get_token_function=creds.get_auth_token if creds else None,
- )
- await reader._start(stream, settings._init_message())
- logger.debug("reader stream %s started session=%s", reader._id, reader._session_id)
+ creds = driver._credentials
+ reader = ReaderStream(
+ reader_reconnector_id,
+ settings,
+ get_token_function=creds.get_auth_token if creds else None,
+ )
+ await reader._start(stream, settings._init_message())
+ except BaseException:
+ # If create() is interrupted (e.g. reader.close() cancels the connection loop
+ # mid-reconnect) the in-flight stream is not yet assigned to the reconnector, so
+ # its finally cannot reach it. Close it here to avoid a zombie gRPC read session
+ # that keeps holding the consumer's partition on the server.
+ if reader is not None:
+ await reader.close(flush=False)
+ else:
+ stream.close()
+ raise
+ logger.debug("%s started", reader._log_prefix)
return reader
async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest):
@@ -522,7 +584,7 @@ class ReaderStream:
self._started = True
self._stream = stream
- logger.debug("reader stream %s send init request", self._id)
+ logger.debug("%s send init request", self._log_prefix)
stream.write(StreamReadMessage.FromClient(client_message=init_message))
try:
@@ -534,7 +596,12 @@ class ReaderStream:
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
self._session_id = init_response.server_message.session_id
- logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)
+ self._log_prefix = "reader %s stream %s session=%s" % (
+ self._reader_reconnector_id,
+ self._id,
+ self._session_id,
+ )
+ logger.debug("%s initialized", self._log_prefix)
else:
raise TopicReaderError("Unexpected message after InitRequest: %s" % init_response)
@@ -668,7 +735,7 @@ class ReaderStream:
async def _read_messages_loop(self):
try:
- logger.debug("reader stream %s start read loop", self._id)
+ logger.debug("%s start read loop", self._log_prefix)
self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.ReadRequest(
@@ -682,7 +749,7 @@ class ReaderStream:
_process_response(message.server_status)
if isinstance(message.server_message, StreamReadMessage.ReadResponse):
- logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size)
+ logger.debug("%s read %s bytes", self._log_prefix, message.server_message.bytes_size)
self._on_read_response(message.server_message)
elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
@@ -693,8 +760,8 @@ class ReaderStream:
StreamReadMessage.StartPartitionSessionRequest,
):
logger.debug(
- "reader stream %s start partition %s",
- self._id,
+ "%s start partition %s",
+ self._log_prefix,
message.server_message.partition_session.partition_session_id,
)
await self._on_start_partition_session(message.server_message)
@@ -704,8 +771,8 @@ class ReaderStream:
StreamReadMessage.StopPartitionSessionRequest,
):
logger.debug(
- "reader stream %s stop partition %s",
- self._id,
+ "%s stop partition %s",
+ self._log_prefix,
message.server_message.partition_session_id,
)
self._on_partition_session_stop(message.server_message)
@@ -715,8 +782,8 @@ class ReaderStream:
StreamReadMessage.EndPartitionSession,
):
logger.debug(
- "reader stream %s end partition %s",
- self._id,
+ "%s end partition %s",
+ self._log_prefix,
message.server_message.partition_session_id,
)
self._on_end_partition_session(message.server_message)
@@ -733,12 +800,12 @@ class ReaderStream:
self._state_changed.set()
except asyncio.CancelledError as e:
- logger.debug("reader stream %s error: %s", self._id, e)
+ logger.debug("%s error: %s", self._log_prefix, e)
if not self._closed:
self._set_first_error(issues.ConnectionLost("gRPC stream cancelled"))
raise
except Exception as e:
- logger.debug("reader stream %s error: %s", self._id, e)
+ logger.debug("%s error: %s", self._log_prefix, e)
self._set_first_error(e)
return
@@ -908,7 +975,7 @@ class ReaderStream:
async def _decode_batches_loop(self):
while True:
batch = await self._batches_to_decode.get()
- logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages))
+ logger.debug("%s decode batch %s messages", self._log_prefix, len(batch.messages))
await self._decode_batch_inplace(batch)
self._add_batch_to_queue(batch)
self._state_changed.set()
@@ -918,8 +985,8 @@ class ReaderStream:
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id]._extend(batch)
logger.debug(
- "reader stream %s extend batch partition=%s size=%s",
- self._id,
+ "%s extend batch partition=%s size=%s",
+ self._log_prefix,
part_sess_id,
len(batch.messages),
)
@@ -927,8 +994,8 @@ class ReaderStream:
self._message_batches[part_sess_id] = batch
logger.debug(
- "reader stream %s new batch partition=%s size=%s",
- self._id,
+ "%s new batch partition=%s size=%s",
+ self._log_prefix,
part_sess_id,
len(batch.messages),
)
@@ -979,7 +1046,7 @@ class ReaderStream:
return
self._closed = True
- logger.debug("reader stream %s close", self._id)
+ logger.debug("%s close", self._log_prefix)
if flush:
await self.flush()
@@ -999,4 +1066,4 @@ class ReaderStream:
if self._background_tasks:
await asyncio.wait(self._background_tasks)
- logger.debug("reader stream %s was closed", self._id)
+ logger.debug("%s was closed", self._log_prefix)
diff --git a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
index 5e55917d830..ff9c59f7989 100644
--- a/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
+++ b/contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py
@@ -833,7 +833,8 @@ class WriterAsyncIOStream:
self._update_token_task.cancel()
await asyncio.wait([self._update_token_task])
- self._stream.close()
+ if getattr(self, "_stream", None) is not None:
+ self._stream.close()
logger.debug("writer stream %s was closed", self._id)
@staticmethod
@@ -845,15 +846,27 @@ class WriterAsyncIOStream:
) -> "WriterAsyncIOStream":
stream = GrpcWrapperAsyncIO(StreamWriteMessage.FromServer.from_proto)
- await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamWrite)
+ writer = None
+ try:
+ await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamWrite)
- creds = driver._credentials
- writer = WriterAsyncIOStream(
- update_token_interval=update_token_interval,
- get_token_function=creds.get_auth_token if creds else lambda: "",
- tx_identity=tx_identity,
- )
- await writer._start(stream, init_request)
+ creds = driver._credentials
+ writer = WriterAsyncIOStream(
+ update_token_interval=update_token_interval,
+ get_token_function=creds.get_auth_token if creds else lambda: "",
+ tx_identity=tx_identity,
+ )
+ await writer._start(stream, init_request)
+ except BaseException:
+ # If create() fails after stream.start() (e.g. the connection is lost while
+ # waiting for the init response), the stream is not yet handed to the
+ # reconnector, so its finally cannot reach it. Close it here to avoid a
+ # stranded gRPC consumption thread that blocks forever on the request queue.
+ if writer is not None:
+ await writer.close()
+ else:
+ stream.close()
+ raise
logger.debug(
"writer stream %s started seqno=%s",
writer._id,
@@ -875,6 +888,10 @@ class WriterAsyncIOStream:
raise Exception("Unknown message while read writer answers: %s" % item)
async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMessage.InitRequest):
+ # Assign before the init handshake (mirrors ReaderStream._start) so that if _start
+ # fails mid-handshake, close() can still reach the stream and release its gRPC thread.
+ self._stream = stream
+
logger.debug("writer stream %s send init request", self._id)
stream.write(StreamWriteMessage.FromClient(init_message))
@@ -895,8 +912,6 @@ class WriterAsyncIOStream:
self.last_seqno,
)
- self._stream = stream
-
if self._update_token_interval is not None:
self._update_token_event.set()
self._update_token_task = asyncio.create_task(self._update_token_loop())
diff --git a/contrib/python/ydb/py3/ydb/aio/connection.py b/contrib/python/ydb/py3/ydb/aio/connection.py
index 3cb6290e1d4..02320bf9afc 100644
--- a/contrib/python/ydb/py3/ydb/aio/connection.py
+++ b/contrib/python/ydb/py3/ydb/aio/connection.py
@@ -26,6 +26,7 @@ from ydb.connection import (
from ydb.driver import DriverConfig
from ydb.settings import BaseRequestSettings
from ydb import issues
+from ydb.opentelemetry.tracing import get_trace_metadata
try:
from ydb.public.api.grpc import ydb_topic_v1_pb2_grpc
@@ -70,6 +71,9 @@ async def _construct_metadata(
metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -148,6 +152,9 @@ class Connection:
"closing",
"endpoint_key",
"node_id",
+ "peer_address",
+ "peer_port",
+ "peer_location",
)
def __init__(
@@ -156,10 +163,12 @@ class Connection:
driver_config: Optional[DriverConfig] = None,
endpoint_options: Optional[EndpointOptions] = None,
) -> None:
- global _stubs_list
self.endpoint = endpoint
self.endpoint_key = EndpointKey(self.endpoint, getattr(endpoint_options, "node_id", None))
self.node_id = getattr(endpoint_options, "node_id", None)
+ self.peer_address = getattr(endpoint_options, "address", None)
+ self.peer_port = getattr(endpoint_options, "port", None)
+ self.peer_location = getattr(endpoint_options, "location", None)
self._channel = channel_factory(self.endpoint, driver_config, grpc.aio, endpoint_options=endpoint_options)
self._driver_config = driver_config
@@ -248,8 +257,12 @@ class Connection:
:param grace:
:return: None
"""
- if hasattr(self, "_channel") and hasattr(self._channel, "close"):
- await self._channel.close(grace)
+ channel = getattr(self, "_channel", None)
+ if channel is not None and hasattr(channel, "close"):
+ await channel.close(grace)
+
+ self._stub_instances.clear()
+ self._channel = None
def add_cleanup_callback(self, callback: Callable[["Connection"], None]) -> None:
self._cleanup_callbacks.append(callback)
diff --git a/contrib/python/ydb/py3/ydb/aio/driver.py b/contrib/python/ydb/py3/ydb/aio/driver.py
index 405e5fcbf37..88221e947db 100644
--- a/contrib/python/ydb/py3/ydb/aio/driver.py
+++ b/contrib/python/ydb/py3/ydb/aio/driver.py
@@ -69,6 +69,7 @@ class Driver(pool.ConnectionPool):
root_certificates,
credentials,
config_class=DriverConfig,
+ **kwargs,
)
super(Driver, self).__init__(config)
diff --git a/contrib/python/ydb/py3/ydb/aio/pool.py b/contrib/python/ydb/py3/ydb/aio/pool.py
index fe7091332ce..4d952086c55 100644
--- a/contrib/python/ydb/py3/ydb/aio/pool.py
+++ b/contrib/python/ydb/py3/ydb/aio/pool.py
@@ -6,6 +6,7 @@ import random
from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING
from ydb import issues
+from ydb.opentelemetry.tracing import SpanName, create_ydb_span
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
from .connection import Connection, EndpointKey
@@ -247,17 +248,36 @@ class ConnectionPool(IConnectionPool):
self._store = ConnectionsCache(driver_config.use_all_nodes)
self._grpc_init = Connection(self._driver_config.endpoint, self._driver_config)
self._stopped = False
+ self._stopping = False
self._discovery: Optional[Discovery] = None
self._discovery_task: "asyncio.Task[None]"
if driver_config.disable_discovery:
# If discovery is disabled, just add the initial endpoint to the store
async def init_connection() -> None:
- ready_connection = Connection(self._driver_config.endpoint, self._driver_config)
- await ready_connection.connection_ready(
- ready_timeout=getattr(self._driver_config, "discovery_request_timeout", 10)
- )
- self._store.add(ready_connection)
+ ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10)
+ while not self._stopping:
+ ready_connection = Connection(self._driver_config.endpoint, self._driver_config)
+ try:
+ await ready_connection.connection_ready(ready_timeout=ready_timeout)
+ except asyncio.CancelledError:
+ try:
+ await ready_connection.close()
+ except Exception:
+ logger.debug("Failed to close cancelled initial connection", exc_info=True)
+ raise
+ except Exception:
+ logger.debug("Initial connection attempt failed", exc_info=True)
+ try:
+ await ready_connection.close()
+ except Exception:
+ logger.debug("Failed to close unsuccessful initial connection", exc_info=True)
+ if not self._stopping:
+ await asyncio.sleep(1)
+ continue
+
+ self._store.add(ready_connection)
+ return
# Create and schedule the task to initialize the connection
self._discovery_task = asyncio.get_event_loop().create_task(init_connection())
@@ -267,6 +287,7 @@ class ConnectionPool(IConnectionPool):
self._discovery_task = asyncio.get_event_loop().create_task(self._discovery.run())
async def stop(self, timeout: int = 10) -> None: # type: ignore[override] # async override of sync method
+ self._stopping = True
if self._discovery:
self._discovery.stop()
await self._grpc_init.close()
@@ -274,6 +295,12 @@ class ConnectionPool(IConnectionPool):
await asyncio.wait_for(self._discovery_task, timeout=timeout)
except asyncio.TimeoutError:
self._discovery_task.cancel()
+ try:
+ await self._discovery_task
+ except asyncio.CancelledError:
+ pass
+ if self._discovery is None:
+ await self._store.cleanup()
self._stopped = True
def _on_disconnected(self, connection: Connection) -> Callable[[], Any]:
@@ -285,7 +312,8 @@ class ConnectionPool(IConnectionPool):
return __wrapper__
async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method
- await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
+ with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context():
+ await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
def discovery_debug_details(self) -> str:
if self._discovery:
diff --git a/contrib/python/ydb/py3/ydb/aio/query/base.py b/contrib/python/ydb/py3/ydb/aio/query/base.py
index a1d6e5d74fc..6c13dfd400a 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/base.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/base.py
@@ -2,9 +2,12 @@ from .. import _utilities
class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ """Async ExecuteQuery result stream."""
+
+ def __init__(self, it, wrapper, on_error=None, on_finish=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._on_finish = on_finish
async def __aenter__(self) -> "AsyncResponseContextIterator":
return self
@@ -15,6 +18,7 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
except StopAsyncIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
+ self._call_on_finish()
raise
except BaseException as e:
# BaseException (not Exception) because asyncio.CancelledError
@@ -25,8 +29,17 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
# reply with SessionBusy.
if self._on_error:
self._on_error(e)
+ self._call_on_finish(e)
raise
+ def _call_on_finish(self, exception=None):
+ if self._on_finish is not None:
+ self._on_finish(exception)
+ self._on_finish = None
+
+ def __del__(self):
+ self._call_on_finish()
+
async def __aexit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors that happen during the cleanup drain have already been reported
@@ -39,3 +52,4 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
pass
except BaseException:
pass
+ self._call_on_finish()
diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py
index a565b266e82..b776b638268 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/session.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/session.py
@@ -19,6 +19,7 @@ from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
from ...query import base
from ...query.session import BaseQuerySession
+from ...opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
@@ -105,8 +106,10 @@ class QuerySession(BaseQuerySession["AsyncDriver"]):
if self._closed:
raise RuntimeError("Session is already closed")
- await self._create_call(settings=settings)
- await self._attach()
+ with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span:
+ await self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ await self._attach()
return self
@@ -159,20 +162,27 @@ class QuerySession(BaseQuerySession["AsyncDriver"]):
"""
self._check_session_ready_to_use()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
return AsyncResponseContextIterator(
it=stream_it,
wrapper=lambda resp: base.wrap_execute_query_response(
@@ -182,6 +192,7 @@ class QuerySession(BaseQuerySession["AsyncDriver"]):
settings=self._settings,
),
on_error=self._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
async def explain(
diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
index c31d79fb088..a05d91f2349 100644
--- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py
@@ -12,6 +12,7 @@ from ...query.transaction import (
BaseQueryTxContext,
QueryTxStateEnum,
)
+from ...opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback
if TYPE_CHECKING:
from .session import QuerySession
@@ -87,7 +88,13 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]):
:return: None or exception if begin is failed
"""
- await self._begin_call(settings)
+ with create_ydb_span(
+ SpanName.BEGIN_TRANSACTION,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ await self._begin_call(settings)
return self
async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
@@ -109,13 +116,19 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]):
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
- await self._commit_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.COMMIT,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
+ await self._commit_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -136,13 +149,19 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]):
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
- await self._rollback_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.ROLLBACK,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
+ await self._rollback_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
async def execute(
self,
@@ -190,20 +209,27 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]):
"""
await self._ensure_prev_stream_finished()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
self._prev_stream = AsyncResponseContextIterator(
it=stream_it,
wrapper=lambda resp: base.wrap_execute_query_response(
@@ -215,5 +241,6 @@ class QueryTxContext(BaseQueryTxContext["AsyncDriver"]):
settings=self.session._settings,
),
on_error=self.session._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
return self._prev_stream
diff --git a/contrib/python/ydb/py3/ydb/aio/table.py b/contrib/python/ydb/py3/ydb/aio/table.py
index 0d14ba2f11b..8d5e02c180d 100644
--- a/contrib/python/ydb/py3/ydb/aio/table.py
+++ b/contrib/python/ydb/py3/ydb/aio/table.py
@@ -462,7 +462,8 @@ async def retry_operation(callee, retry_settings=None, *args, **kwargs): # pyli
opt_generator = ydb.retry_operation_impl(callee, retry_settings, *args, **kwargs)
for next_opt in opt_generator:
if isinstance(next_opt, ydb.YdbRetryOperationSleepOpt):
- await asyncio.sleep(next_opt.timeout)
+ if next_opt.timeout > 0:
+ await asyncio.sleep(next_opt.timeout)
else:
try:
return await next_opt.result
diff --git a/contrib/python/ydb/py3/ydb/connection.py b/contrib/python/ydb/py3/ydb/connection.py
index 98fbd5aab31..7e5ad3675fe 100644
--- a/contrib/python/ydb/py3/ydb/connection.py
+++ b/contrib/python/ydb/py3/ydb/connection.py
@@ -24,6 +24,7 @@ from google.protobuf import text_format
import grpc
from . import issues, _apis, _utilities
from . import default_pem
+from .opentelemetry.tracing import get_trace_metadata
_stubs_list = (
_apis.TableService.Stub,
@@ -179,6 +180,9 @@ def _construct_metadata(driver_config, settings):
metadata.extend(getattr(settings, "headers", []))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -194,11 +198,14 @@ def _get_request_timeout(settings):
class EndpointOptions(object):
- __slots__ = ("ssl_target_name_override", "node_id")
+ __slots__ = ("ssl_target_name_override", "node_id", "address", "port", "location")
- def __init__(self, ssl_target_name_override=None, node_id=None):
+ def __init__(self, ssl_target_name_override=None, node_id=None, address=None, port=None, location=None):
self.ssl_target_name_override = ssl_target_name_override
self.node_id = node_id
+ self.address = address
+ self.port = port
+ self.location = location
def _construct_channel_options(driver_config, endpoint_options=None):
@@ -405,6 +412,9 @@ class Connection(object):
"closing",
"endpoint_key",
"node_id",
+ "peer_address",
+ "peer_port",
+ "peer_location",
)
def __init__(
@@ -419,9 +429,11 @@ class Connection(object):
discovered by the YDB endpoint discovery mechanism
:param driver_config: A driver config instance to be used for RPC call interception
"""
- global _stubs_list
self.endpoint = endpoint
self.node_id = getattr(endpoint_options, "node_id", None)
+ self.peer_address = getattr(endpoint_options, "address", None)
+ self.peer_port = getattr(endpoint_options, "port", None)
+ self.peer_location = getattr(endpoint_options, "location", None)
self.endpoint_key = EndpointKey(endpoint, getattr(endpoint_options, "node_id", None))
self._channel = channel_factory(self.endpoint, driver_config, endpoint_options=endpoint_options)
self._driver_config = driver_config
@@ -590,8 +602,12 @@ class Connection(object):
self.destroy()
def destroy(self):
- if hasattr(self, "_channel") and hasattr(self._channel, "close"):
- self._channel.close()
+ channel = getattr(self, "_channel", None)
+ if channel is not None and hasattr(channel, "close"):
+ channel.close()
+
+ self._stub_instances.clear()
+ self._channel = None
def ready_future(self):
"""
diff --git a/contrib/python/ydb/py3/ydb/convert.py b/contrib/python/ydb/py3/ydb/convert.py
index b54e8bddc8d..89f28ecc597 100644
--- a/contrib/python/ydb/py3/ydb/convert.py
+++ b/contrib/python/ydb/py3/ydb/convert.py
@@ -24,11 +24,21 @@ _initialize()
class _DotDict(dict):
+ # A lazy __dict__ is declared on purpose: it is not materialized until a row
+ # is written to (or its __dict__ is introspected), so untouched read-only
+ # rows avoid the per-instance dict overhead while callers can still attach
+ # their own attributes (ORM-style row.extra = ...), which materializes the
+ # dict for that row alone.
+ __slots__ = ("__dict__",)
+
def __init__(self, *args, **kwargs):
super(_DotDict, self).__init__(*args, **kwargs)
def __getattr__(self, item):
- return self[item]
+ try:
+ return self[item]
+ except KeyError:
+ raise AttributeError(item) from None
def _is_decimal_signed(hi_value):
@@ -83,7 +93,7 @@ def _pb_to_dict(type_pb, value_pb, table_client_settings):
class _Struct(_DotDict):
- pass
+ __slots__ = ()
def _pb_to_struct(type_pb, value_pb, table_client_settings):
@@ -351,6 +361,16 @@ def _unwrap_optionality(column):
return _to_native_map.get(current_type), c_type
+def _detach_columns(columns):
+ # The columns container references the source protobuf message, which keeps
+ # the whole arena (including already-parsed message.rows) alive for as long
+ # as the result set is held. Copy the schema into a standalone message so the
+ # source arena can be released right after conversion.
+ holder = _apis.ydb_value.ResultSet()
+ holder.columns.extend(columns)
+ return holder.columns
+
+
class _ResultSet(object):
__slots__ = ("columns", "rows", "truncated", "snapshot", "index", "format", "arrow_format_meta", "data")
@@ -375,12 +395,17 @@ class _ResultSet(object):
for column in message.columns:
column_parsers.append(_unwrap_optionality(column))
+ # Names are the only per-row metadata we need. Storing this shared tuple
+ # (instead of the proto columns) keeps rows detached from the source
+ # protobuf arena, so it can be freed once conversion is done.
+ column_names = tuple(column.name for column in message.columns)
+
for row_proto in message.rows:
- row = _Row(message.columns)
- for column, value, column_info in zip(message.columns, row_proto.items, column_parsers):
+ row = _Row(column_names)
+ for name, value, column_info in zip(column_names, row_proto.items, column_parsers):
v_type = value.WhichOneof("value")
if v_type == "null_flag_value":
- row[column.name] = None
+ row[name] = None
continue
while v_type == "nested_value":
@@ -388,7 +413,7 @@ class _ResultSet(object):
v_type = value.WhichOneof("value")
column_parser, unwrapped_type = column_info
- row[column.name] = column_parser(unwrapped_type, value, table_client_settings)
+ row[name] = column_parser(unwrapped_type, value, table_client_settings)
rows.append(row)
from ydb.query import QueryResultSetFormat, ArrowFormatMeta
@@ -401,12 +426,17 @@ class _ResultSet(object):
data = message.data if message.data else None
- return cls(message.columns, rows, message.truncated, snapshot, index, result_format, arrow_meta, data)
+ return cls(
+ _detach_columns(message.columns), rows, message.truncated, snapshot, index, result_format, arrow_meta, data
+ )
@classmethod
def lazy_from_message(cls, message, table_client_settings=None, snapshot=None):
from ydb.query import QueryResultSetFormat, ArrowFormatMeta
+ # No _detach_columns here on purpose: _LazyRows defers parsing and keeps
+ # message.rows, so the source arena is pinned regardless — detaching the
+ # schema would only add a copy without freeing anything.
rows = _LazyRows(message.rows, table_client_settings, message.columns)
result_format = message.format if message.format else QueryResultSetFormat.VALUE
@@ -422,15 +452,17 @@ ResultSet = _ResultSet
class _Row(_DotDict):
+ __slots__ = ("_columns",)
+
def __init__(self, columns):
super(_Row, self).__init__()
self._columns = columns
def __getitem__(self, key):
if isinstance(key, int):
- return self[self._columns[key].name]
+ return self[self._columns[key]]
elif isinstance(key, slice):
- return tuple(map(lambda x: self[x.name], self._columns[key]))
+ return tuple(self[name] for name in self._columns[key])
else:
return super(_Row, self).__getitem__(key)
@@ -455,10 +487,11 @@ class _LazyRowItem:
class _LazyRow(_DotDict):
+ __slots__ = ("_columns",)
+
def __init__(self, columns, proto_row, table_client_settings, parsers):
super(_LazyRow, self).__init__()
self._columns = columns
- self._table_client_settings = table_client_settings
for i, (column, row_item) in enumerate(zip(self._columns, proto_row.items)):
super(_LazyRow, self).__setitem__(
column.name,
diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py b/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py
new file mode 100644
index 00000000000..fc058d0d866
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/opentelemetry/__init__.py
@@ -0,0 +1,36 @@
+"""Public OpenTelemetry entrypoints for YDB."""
+
+
+def enable_tracing(tracer=None):
+ """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.
+
+ This call is **idempotent**: if tracing is already enabled, later calls do nothing
+ (including passing a different ``tracer``). Call :func:`disable_tracing` first to
+ reconfigure or turn instrumentation off.
+
+ Args:
+ tracer: Optional OTel tracer to use. If not provided, the default tracer named
+ ``ydb.sdk`` from the global tracer provider will be used.
+ """
+ try:
+ from ydb.opentelemetry.plugin import _enable_tracing
+ except ImportError:
+ raise ImportError(
+ "OpenTelemetry packages are required for tracing support. "
+ "Install them with: pip install ydb[opentelemetry]"
+ ) from None
+
+ _enable_tracing(tracer)
+
+
+def disable_tracing():
+ """Disable YDB OpenTelemetry hooks and allow :func:`enable_tracing` to run again."""
+ try:
+ from ydb.opentelemetry.plugin import _disable_tracing
+ except ImportError:
+ return
+
+ _disable_tracing()
+
+
+__all__ = ["disable_tracing", "enable_tracing"]
diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py b/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py
new file mode 100644
index 00000000000..76942789f52
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/opentelemetry/plugin.py
@@ -0,0 +1,134 @@
+"""OpenTelemetry bridge for YDB."""
+
+from opentelemetry import context as otel_context
+from opentelemetry import trace
+from opentelemetry.propagate import inject
+from opentelemetry.trace import StatusCode
+
+from ydb import issues
+from ydb.issues import StatusCode as YdbStatusCode
+from ydb.opentelemetry.tracing import _registry
+
+# YDB client transport StatusCode values (401xxx band) -> OTel error.type transport_error.
+_TRANSPORT_STATUSES = frozenset(
+ {
+ YdbStatusCode.CONNECTION_LOST,
+ YdbStatusCode.CONNECTION_FAILURE,
+ YdbStatusCode.DEADLINE_EXCEEDED,
+ YdbStatusCode.CLIENT_INTERNAL_ERROR,
+ YdbStatusCode.UNIMPLEMENTED,
+ }
+)
+
+_tracer = None
+_enabled = False
+
+_KIND_MAP = {
+ "client": trace.SpanKind.CLIENT,
+ "internal": trace.SpanKind.INTERNAL,
+}
+
+
+def _otel_metadata_hook():
+ """Inject W3C Trace Context into outgoing gRPC metadata using the active OTel context."""
+ headers = {}
+ inject(headers)
+ return list(headers.items())
+
+
+def _set_error_on_span(span, exception):
+ if isinstance(exception, issues.Error) and exception.status is not None:
+ span.set_attribute("db.response.status_code", exception.status.name)
+ error_type = "transport_error" if exception.status in _TRANSPORT_STATUSES else "ydb_error"
+ else:
+ error_type = type(exception).__qualname__
+
+ span.set_attribute("error.type", error_type)
+ span.set_status(StatusCode.ERROR, str(exception))
+ span.record_exception(exception)
+
+
+class _AttachContext:
+ """Make a span the active OTel context for a ``with`` block.
+
+ When ``end_on_exit=True`` (default) the span is ended on exit — used for
+ single-shot RPCs. When ``end_on_exit=False`` the span is only ended on
+ exception — used for streaming RPCs where the result iterator owns ``end()``.
+ """
+
+ def __init__(self, span, end_on_exit):
+ self._span = span
+ self._end_on_exit = end_on_exit
+ self._token = None
+
+ def __enter__(self):
+ ctx = trace.set_span_in_context(self._span._span)
+ self._token = otel_context.attach(ctx)
+ return self._span
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self._token is not None:
+ otel_context.detach(self._token)
+ self._token = None
+ if exc_val is not None:
+ self._span.set_error(exc_val)
+ self._span.end()
+ elif self._end_on_exit:
+ self._span.end()
+ return False
+
+
+class TracingSpan:
+ """Wrapper around an OTel span.
+
+ Use :meth:`attach_context` as a context manager around any RPC call.
+ The default (``end_on_exit=True``) is for single-shot operations; pass
+ ``end_on_exit=False`` for streaming RPCs where the result iterator owns
+ ``end()``.
+ """
+
+ def __init__(self, span):
+ self._span = span
+
+ def set_error(self, exception):
+ _set_error_on_span(self._span, exception)
+
+ def set_attribute(self, key, value):
+ self._span.set_attribute(key, value)
+
+ def end(self):
+ self._span.end()
+
+ def attach_context(self, end_on_exit=True):
+ return _AttachContext(self, end_on_exit)
+
+
+def _create_span(name, attributes=None, kind=None):
+ span = _tracer.start_span(
+ name,
+ kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT),
+ attributes=attributes or {},
+ )
+ return TracingSpan(span)
+
+
+def _enable_tracing(tracer=None):
+ global _enabled, _tracer
+
+ if _enabled:
+ return
+
+ _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk")
+ _enabled = True
+ _registry.set_metadata_hook(_otel_metadata_hook)
+ _registry.set_create_span(_create_span)
+
+
+def _disable_tracing():
+ """Clear hooks and tracer; after this, :func:`~ydb.opentelemetry.enable_tracing` may be called again."""
+ global _enabled, _tracer
+
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _enabled = False
+ _tracer = None
diff --git a/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py b/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py
new file mode 100644
index 00000000000..1d0995df8d5
--- /dev/null
+++ b/contrib/python/ydb/py3/ydb/opentelemetry/tracing.py
@@ -0,0 +1,165 @@
+"""Internal SDK tracing helpers and registry."""
+
+import enum
+from typing import Optional, Tuple
+
+
+class SpanName(str, enum.Enum):
+ """Canonical span names used across the YDB SDK."""
+
+ CREATE_SESSION = "ydb.CreateSession"
+ EXECUTE_QUERY = "ydb.ExecuteQuery"
+ BEGIN_TRANSACTION = "ydb.BeginTransaction"
+ COMMIT = "ydb.Commit"
+ ROLLBACK = "ydb.Rollback"
+ DRIVER_INITIALIZE = "ydb.Driver.Initialize"
+ RUN_WITH_RETRY = "ydb.RunWithRetry"
+ TRY = "ydb.Try"
+
+
+class _NoopCtx:
+ __slots__ = ("_span",)
+
+ def __init__(self, span):
+ self._span = span
+
+ def __enter__(self):
+ return self._span
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+class _NoopSpan:
+ """Returned by create_ydb_span when tracing is disabled."""
+
+ def set_error(self, exception):
+ pass
+
+ def set_attribute(self, key, value):
+ pass
+
+ def end(self):
+ pass
+
+ def attach_context(self, end_on_exit=True):
+ return _NoopCtx(self)
+
+
+_NOOP_SPAN = _NoopSpan()
+
+
+class OtelTracingRegistry:
+ """Singleton registry for OpenTelemetry tracing.
+
+ By default everything is no-op until :func:`~ydb.opentelemetry.enable_tracing` is called.
+ """
+
+ def __init__(self):
+ self._metadata_hook = None
+ self._create_span_func = None
+
+ def is_active(self) -> bool:
+ return self._create_span_func is not None
+
+ def create_span(self, name, attributes=None, kind=None):
+ if self._create_span_func is None:
+ return _NOOP_SPAN
+ return self._create_span_func(name, attributes, kind=kind)
+
+ def get_trace_metadata(self):
+ if self._metadata_hook is not None:
+ return self._metadata_hook()
+ return []
+
+ def set_metadata_hook(self, hook):
+ self._metadata_hook = hook
+
+ def set_create_span(self, func):
+ self._create_span_func = func
+
+
+_registry = OtelTracingRegistry()
+
+
+def get_trace_metadata():
+ """Return tracing metadata for gRPC calls."""
+ return _registry.get_trace_metadata()
+
+
+def _split_endpoint(endpoint: Optional[str]) -> Tuple[str, int]:
+ ep = endpoint or ""
+ if ep.startswith("grpcs://"):
+ ep = ep[len("grpcs://") :]
+ elif ep.startswith("grpc://"):
+ ep = ep[len("grpc://") :]
+
+ if ep.startswith("["):
+ close = ep.find("]")
+ if close != -1 and len(ep) > close + 1 and ep[close + 1] == ":":
+ host = ep[: close + 1]
+ port_s = ep[close + 2 :]
+ return host, int(port_s) if port_s.isdigit() else 0
+
+ host, sep, port_s = ep.rpartition(":")
+ if not sep:
+ return ep, 0
+ return host, int(port_s) if port_s.isdigit() else 0
+
+
+def _build_ydb_attrs(driver_config, node_id=None, peer=None):
+ host, port = _split_endpoint(getattr(driver_config, "endpoint", None))
+ attrs = {
+ "db.system.name": "ydb",
+ "db.namespace": getattr(driver_config, "database", None) or "",
+ "server.address": host,
+ "server.port": port,
+ }
+ if peer is not None:
+ address, port_, location = peer
+ if address is not None:
+ attrs["network.peer.address"] = address
+ if port_ is not None:
+ attrs["network.peer.port"] = int(port_)
+ if location:
+ attrs["ydb.node.dc"] = location
+ if node_id is not None:
+ attrs["ydb.node.id"] = node_id
+ return attrs
+
+
+def create_span(name, attributes=None, kind="internal"):
+ """Create a span with no YDB-specific attributes (used for SDK-internal operations)."""
+ return _registry.create_span(name, attributes=attributes, kind=kind).attach_context()
+
+
+def create_ydb_span(name, driver_config, node_id=None, kind=None, peer=None):
+ """Create a span pre-filled with standard YDB attributes."""
+ if not _registry.is_active():
+ return _NOOP_SPAN
+ attrs = _build_ydb_attrs(driver_config, node_id, peer)
+ return _registry.create_span(name, attributes=attrs, kind=kind)
+
+
+def set_peer_attributes(span, peer):
+ """Fill in network.peer.* and ydb.node.dc on an existing span once the peer is known."""
+ if peer is None:
+ return
+ address, port, location = peer
+ if address is not None:
+ span.set_attribute("network.peer.address", address)
+ if port is not None:
+ span.set_attribute("network.peer.port", int(port))
+ if location:
+ span.set_attribute("ydb.node.dc", location)
+
+
+def span_finish_callback(span):
+ """Return an on_finish callable that ends *span* when a streaming result iterator completes."""
+
+ def _finish(exception=None):
+ if exception is not None:
+ span.set_error(exception)
+ span.end()
+
+ return _finish
diff --git a/contrib/python/ydb/py3/ydb/pool.py b/contrib/python/ydb/py3/ydb/pool.py
index 2901c573435..f1a4bdf940b 100644
--- a/contrib/python/ydb/py3/ydb/pool.py
+++ b/contrib/python/ydb/py3/ydb/pool.py
@@ -10,6 +10,7 @@ import random
from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING
from . import connection as connection_impl, issues, resolver, _utilities, tracing
+from .opentelemetry.tracing import SpanName, create_ydb_span
from abc import abstractmethod
from .connection import Connection, EndpointKey
@@ -400,23 +401,41 @@ class ConnectionPool(IConnectionPool):
self._store = ConnectionsCache(driver_config.use_all_nodes, driver_config.tracer)
self.tracer = driver_config.tracer
self._grpc_init = connection_impl.Connection(self._driver_config.endpoint, self._driver_config)
+ self._stopped = False
+ self._stop_guard = threading.Lock()
+ self._stop_event = threading.Event()
+ self._init_thread: Optional[threading.Thread] = None
if driver_config.disable_discovery:
- # If discovery is disabled, just add the initial endpoint to the store
- ready_connection = connection_impl.Connection.ready_factory(
- self._driver_config.endpoint,
- self._driver_config,
- ready_timeout=getattr(self._driver_config, "discovery_request_timeout", 10),
- )
- self._store.add(ready_connection)
+ # If discovery is disabled, establish the initial connection in a
+ # background thread, retrying until it succeeds or the pool is stopped.
+ # Doing this off the constructor keeps wait(timeout) as the blocking
+ # point and lets stop() interrupt the retry loop.
self._discovery_thread = None
+ self._init_thread = threading.Thread(
+ name="ydb_driver_initial_connection",
+ target=self._init_connection,
+ daemon=True,
+ )
+ self._init_thread.start()
else:
# Start discovery thread as usual
self._discovery_thread = Discovery(self._store, self._driver_config)
self._discovery_thread.start()
- self._stopped = False
- self._stop_guard = threading.Lock()
+ def _init_connection(self) -> None:
+ ready_timeout = getattr(self._driver_config, "discovery_request_timeout", 10)
+ while not self._stopped:
+ ready_connection = connection_impl.Connection.ready_factory(
+ self._driver_config.endpoint,
+ self._driver_config,
+ ready_timeout=ready_timeout,
+ )
+ if self._store.add(ready_connection):
+ return
+
+ logger.debug("Initial connection attempt failed")
+ self._stop_event.wait(1)
def stop(self, timeout: int = 10) -> None:
"""
@@ -430,11 +449,16 @@ class ConnectionPool(IConnectionPool):
return
self._stopped = True
+ self._stop_event.set()
if self._discovery_thread:
self._discovery_thread.stop()
self._grpc_init.close()
if self._discovery_thread:
self._discovery_thread.join(timeout)
+ if self._init_thread:
+ self._init_thread.join(timeout)
+ if self._discovery_thread is None:
+ self._store.cleanup()
def async_wait(self, fail_fast: bool = False) -> "futures.Future[None]":
"""
@@ -453,10 +477,11 @@ class ConnectionPool(IConnectionPool):
:param timeout: A timeout to wait in seconds
:return: None
"""
- if fail_fast:
- self._store.add_fast_fail().result(timeout)
- else:
- self._store.subscribe().result(timeout)
+ with create_ydb_span(SpanName.DRIVER_INITIALIZE, self._driver_config, kind="internal").attach_context():
+ if fail_fast:
+ self._store.add_fast_fail().result(timeout)
+ else:
+ self._store.subscribe().result(timeout)
def _on_disconnected(self, connection: Connection) -> None:
"""
diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py
index 7fea6cd0e8c..093c7d55452 100644
--- a/contrib/python/ydb/py3/ydb/query/base.py
+++ b/contrib/python/ydb/py3/ydb/query/base.py
@@ -27,7 +27,6 @@ from .. import _apis
from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop
from ydb._grpc.grpcwrapper.common_utils import to_thread
-
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
from .session import BaseQuerySession
@@ -73,9 +72,12 @@ class QueryResultSetFormat(enum.IntEnum):
class SyncResponseContextIterator(_utilities.SyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ """Streams ExecuteQuery results."""
+
+ def __init__(self, it, wrapper, on_error=None, on_finish=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._on_finish = on_finish
def __enter__(self) -> "SyncResponseContextIterator":
return self
@@ -86,6 +88,7 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator):
except StopIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
+ self._call_on_finish()
raise
except BaseException as e:
# BaseException (not Exception) for parity with the async iterator:
@@ -95,8 +98,17 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator):
# SessionBusy.
if self._on_error:
self._on_error(e)
+ self._call_on_finish(e)
raise
+ def _call_on_finish(self, exception=None):
+ if self._on_finish is not None:
+ self._on_finish(exception)
+ self._on_finish = None
+
+ def __del__(self):
+ self._call_on_finish()
+
def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors during the cleanup drain have already been reported to _on_error
@@ -107,6 +119,7 @@ class SyncResponseContextIterator(_utilities.SyncResponseIterator):
pass
except BaseException:
pass
+ self._call_on_finish()
class QueryClientSettings:
diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py
index f2099f8cdbf..a9c1b4a50d9 100644
--- a/contrib/python/ydb/py3/ydb/query/session.py
+++ b/contrib/python/ydb/py3/ydb/query/session.py
@@ -18,6 +18,7 @@ from . import base
from .base import QueryExplainResultFormat
from .. import _apis, issues, _utilities
+from ..opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback
from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState, EndpointKey
from .._grpc.grpcwrapper import common_utils
@@ -30,7 +31,7 @@ from .transaction import QueryTxContext
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
if TYPE_CHECKING:
- from ..driver import Driver as SyncDriver
+ from ..driver import Driver as SyncDriver, DriverConfig
from ..aio.driver import Driver as AsyncDriver
@@ -46,9 +47,30 @@ def wrapper_create_session(
issues._process_response(message.status)
session._session_id = message.session_id
session._node_id = message.node_id
+ session._peer = _resolve_peer(session._driver, message.node_id)
return session
+def _resolve_peer(driver, node_id):
+ """Look up network.peer.* / ydb.node.dc for a node in the driver's endpoint map."""
+ if node_id is None:
+ return None
+ store = getattr(driver, "_store", None)
+ if store is None:
+ return None
+ by_node = getattr(store, "connections_by_node_id", None)
+ if not by_node:
+ return None
+ connection = by_node.get(node_id)
+ if connection is None:
+ return None
+ return (
+ getattr(connection, "peer_address", None),
+ getattr(connection, "peer_port", None),
+ getattr(connection, "peer_location", None),
+ )
+
+
def wrapper_delete_session(
rpc_state: RpcState,
response_pb: _apis.ydb_query.DeleteSessionResponse,
@@ -69,6 +91,7 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]):
# Session data
_session_id: Optional[str] = None
_node_id: Optional[int] = None
+ _peer: Optional[tuple] = None
_closed: bool = False
_invalidated: bool = False
@@ -85,6 +108,10 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]):
self._last_query_stats = None
@property
+ def _driver_config(self) -> Optional["DriverConfig"]:
+ return getattr(self._driver, "_driver_config", None)
+
+ @property
def session_id(self) -> Optional[str]:
return self._session_id
@@ -391,8 +418,10 @@ class QuerySession(BaseQuerySession["SyncDriver"]):
if self._closed:
raise RuntimeError("Session is already closed.")
- self._create_call(settings=settings)
- self._attach()
+ with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span:
+ self._create_call(settings=settings)
+ set_peer_attributes(span, self._peer)
+ self._attach()
return self
@@ -458,20 +487,27 @@ class QuerySession(BaseQuerySession["SyncDriver"]):
"""
self._check_session_ready_to_use()
- stream_it = self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self._node_id,
+ peer=self._peer,
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
return base.SyncResponseContextIterator(
stream_it,
lambda resp: base.wrap_execute_query_response(
@@ -481,6 +517,7 @@ class QuerySession(BaseQuerySession["SyncDriver"]):
settings=self._settings,
),
on_error=self._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
def explain(
diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py
index fdcefb0b801..1d278ac2fee 100644
--- a/contrib/python/ydb/py3/ydb/query/transaction.py
+++ b/contrib/python/ydb/py3/ydb/query/transaction.py
@@ -17,6 +17,7 @@ from .. import (
_apis,
issues,
)
+from ..opentelemetry.tracing import SpanName, create_ydb_span, span_finish_callback
from .._grpc.grpcwrapper import ydb_topic as _ydb_topic
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from ..connection import _RpcState as RpcState
@@ -245,6 +246,10 @@ class BaseQueryTxContext(base.CallbackHandler, Generic[DriverT]):
self._last_query_stats = None
@property
+ def _driver_config(self):
+ return getattr(self._driver, "_driver_config", None)
+
+ @property
def session_id(self) -> Optional[str]:
"""
A transaction's session id
@@ -523,7 +528,13 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]):
:return: Transaction object or exception if begin is failed
"""
- self._begin_call(settings)
+ with create_ydb_span(
+ SpanName.BEGIN_TRANSACTION,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ self._begin_call(settings)
return self
@@ -545,13 +556,19 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]):
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
- self._commit_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.COMMIT,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
+ self._commit_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -571,13 +588,19 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]):
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
- self._rollback_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ SpanName.ROLLBACK,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
+ ).attach_context():
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
+ self._rollback_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
def execute(
self,
@@ -626,20 +649,27 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]):
"""
self._ensure_prev_stream_finished()
- stream_it = self._execute_call(
- query=query,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- parameters=parameters,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ SpanName.EXECUTE_QUERY,
+ self._driver_config,
+ node_id=self.session.node_id,
+ peer=getattr(self.session, "_peer", None),
)
+ with span.attach_context(end_on_exit=False):
+ stream_it = self._execute_call(
+ query=query,
+ commit_tx=commit_tx,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
self._prev_stream = base.SyncResponseContextIterator(
stream_it,
lambda resp: base.wrap_execute_query_response(
@@ -651,5 +681,6 @@ class QueryTxContext(BaseQueryTxContext["SyncDriver"]):
settings=self.session._settings,
),
on_error=self.session._on_execute_stream_error,
+ on_finish=span_finish_callback(span),
)
return self._prev_stream
diff --git a/contrib/python/ydb/py3/ydb/resolver.py b/contrib/python/ydb/py3/ydb/resolver.py
index e9e8e6f93f3..1402e542abc 100644
--- a/contrib/python/ydb/py3/ydb/resolver.py
+++ b/contrib/python/ydb/py3/ydb/resolver.py
@@ -53,7 +53,11 @@ class EndpointInfo(object):
ssl_target_name_override = self.address
endpoint_options = conn_impl.EndpointOptions(
- ssl_target_name_override=ssl_target_name_override, node_id=self.node_id
+ ssl_target_name_override=ssl_target_name_override,
+ node_id=self.node_id,
+ address=self.address,
+ port=self.port,
+ location=self.location,
)
if self.ipv6_addrs or self.ipv4_addrs:
diff --git a/contrib/python/ydb/py3/ydb/retries.py b/contrib/python/ydb/py3/ydb/retries.py
index c151e3d2146..4b7c137f39e 100644
--- a/contrib/python/ydb/py3/ydb/retries.py
+++ b/contrib/python/ydb/py3/ydb/retries.py
@@ -7,6 +7,11 @@ from typing import Any, Callable, Generator, Optional, Union
from . import issues
from ._errors import check_retriable_error
+from .opentelemetry.tracing import SpanName, create_span as _create_span
+
+
+def _try_span_attrs(backoff_ms: Optional[int]):
+ return {"ydb.retry.backoff_ms": backoff_ms} if backoff_ms is not None else None
class BackoffSettings:
@@ -129,19 +134,18 @@ def retry_operation_impl(
if not retriable_info.is_retriable:
raise
- skip_yield_error_types = [
+ skip_yield_error_types = (
issues.Aborted,
issues.BadSession,
issues.NotFound,
issues.InternalError,
- ]
+ )
- yield_sleep = True
- for t in skip_yield_error_types:
- if isinstance(e, t):
- yield_sleep = False
-
- if yield_sleep:
+ if isinstance(e, skip_yield_error_types):
+ # Skip the inter-attempt sleep but still emit a marker so consumers
+ # advance per-attempt bookkeeping (e.g. ``ydb.Try`` spans get backoff=0).
+ yield YdbRetryOperationSleepOpt(0.0)
+ else:
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
except Exception as e:
@@ -159,12 +163,21 @@ def retry_operation_sync(
*args: Any,
**kwargs: Any,
) -> Any:
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- time.sleep(next_opt.timeout)
- else:
- return next_opt.result
+ backoff_ms: Optional[int] = None
+
+ @functools.wraps(callee)
+ def traced_callee(*a: Any, **kw: Any) -> Any:
+ with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)):
+ return callee(*a, **kw)
+
+ with _create_span(SpanName.RUN_WITH_RETRY):
+ for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs):
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ backoff_ms = int(next_opt.timeout * 1000)
+ if next_opt.timeout > 0:
+ time.sleep(next_opt.timeout)
+ else:
+ return next_opt.result
return None
@@ -186,15 +199,20 @@ async def retry_operation_async( # pylint: disable=W1113
Returns awaitable result of coroutine. If retries are not succussful exception is raised.
"""
- opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
- for next_opt in opt_generator:
- if isinstance(next_opt, YdbRetryOperationSleepOpt):
- await asyncio.sleep(next_opt.timeout)
- else:
- try:
- return await next_opt.result
- except BaseException as e: # pylint: disable=W0703
- next_opt.set_exception(e)
+ backoff_ms: Optional[int] = None
+ with _create_span(SpanName.RUN_WITH_RETRY):
+ for next_opt in retry_operation_impl(callee, retry_settings, *args, **kwargs):
+ if isinstance(next_opt, YdbRetryOperationSleepOpt):
+ backoff_ms = int(next_opt.timeout * 1000)
+ if next_opt.timeout > 0:
+ await asyncio.sleep(next_opt.timeout)
+ else:
+ with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)) as try_span:
+ try:
+ return await next_opt.result
+ except BaseException as e: # pylint: disable=W0703
+ try_span.set_error(e)
+ next_opt.set_exception(e)
return None
diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py
index d7d1d1869be..989938e1be0 100644
--- a/contrib/python/ydb/py3/ydb/ydb_version.py
+++ b/contrib/python/ydb/py3/ydb/ydb_version.py
@@ -1 +1 @@
-VERSION = "3.28.4"
+VERSION = "3.29.6"