diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-07-25 18:35:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-25 15:35:20 +0000 |
commit | 4e27de679ea7b3c29cea76cdf0f4dd3c9f0f0057 (patch) | |
tree | e44083defe12afbba565b316cacf01605f41471c | |
parent | 3e762190b57ab14961a201d37a799fd873c7b65a (diff) | |
download | ydb-4e27de679ea7b3c29cea76cdf0f4dd3c9f0f0057.tar.gz |
Fix StreamLookupJoin freeze (#21693)
-rw-r--r-- | .github/config/muted_ya.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 45 |
2 files changed, 42 insertions, 4 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 6fc5836624d..372af30ec07 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -36,7 +36,6 @@ ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore ydb/core/kqp/ut/query KqpLimits.StreamWrite+Allowed ydb/core/kqp/ut/query KqpStats.DeferredEffects+UseSink ydb/core/kqp/ut/query KqpStats.SysViewClientLost -ydb/core/kqp/ut/scan KqpSplit.StreamLookupJoinDeliveryProblemAfterFirstResult ydb/core/kqp/ut/scheme KqpOlapScheme.AddPgColumnWithStore ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns ydb/core/kqp/ut/scheme unittest.[*/*] chunk diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 48d3cdf4bcb..5650bf6abe6 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -158,6 +158,7 @@ private: struct TShardState { ui64 RetryAttempts = 0; std::unordered_set<ui64> Reads; + bool HasPipe = false; }; struct TReads { @@ -222,6 +223,18 @@ private: return result; } + + bool NeedToCreatePipe(ui64 shardId) { + return !ReadsPerShard[shardId].HasPipe; + } + + void SetPipeCreated(ui64 shardId) { + ReadsPerShard[shardId].HasPipe = true; + } + + void SetPipeDestroyed(ui64 shardId) { + ReadsPerShard[shardId].HasPipe = false; + } }; struct TEvPrivate { @@ -498,9 +511,18 @@ private: request->Record.SetMaxRows(defaultSettings.GetMaxRows()); request->Record.SetMaxBytes(defaultSettings.GetMaxBytes()); - Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true), + const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId); + + Send(PipeCacheId, + new TEvPipeCache::TEvForward( + request.Release(), read.ShardId, TEvPipeCache::TEvForwardOptions{ + .AutoConnect = needToCreatePipe, + .Subscribe = needToCreatePipe, + }), IEventHandle::FlagTrackDelivery); + Reads.SetPipeCreated(read.ShardId); + CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId); if (auto delay = ShardTimeout()) { @@ -522,6 +544,8 @@ private: const auto& tabletId = ev->Get()->TabletId; + Reads.SetPipeDestroyed(tabletId); + TVector<TReadState*> toRetry; for (auto* read : Reads.GetShardReads(tabletId)) { if (read->State == EReadState::Running) { @@ -560,6 +584,7 @@ private: if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) { if (ev->Get()->InstantStart) { + auto guard = BindAllocator(); auto requests = StreamLookupWorker->RebuildRequest(read.Id, ReadId); for (auto& request : requests) { StartTableRead(read.ShardId, std::move(request)); @@ -630,8 +655,21 @@ private: << ", lockTxId=" << record.GetLockTxId() << ", lockNodeId=" << record.GetLockNodeId()); - Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), shardId, true), - IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId()); + const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId); + + Send(PipeCacheId, + new TEvPipeCache::TEvForward( + request.Release(), + shardId, + TEvPipeCache::TEvForwardOptions{ + .AutoConnect = needToCreatePipe, + .Subscribe = needToCreatePipe, + }), + IEventHandle::FlagTrackDelivery, + 0, + LookupActorSpan.GetTraceId()); + + Reads.SetPipeCreated(read.ShardId); read.State = EReadState::Running; @@ -669,6 +707,7 @@ private: auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry); if (delay == TDuration::Zero()) { + auto guard = BindAllocator(); auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, ReadId); for (auto& request : requests) { StartTableRead(failedRead.ShardId, std::move(request)); |