aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-07-25 18:35:20 +0300
committerGitHub <noreply@github.com>2025-07-25 15:35:20 +0000
commit4e27de679ea7b3c29cea76cdf0f4dd3c9f0f0057 (patch)
treee44083defe12afbba565b316cacf01605f41471c
parent3e762190b57ab14961a201d37a799fd873c7b65a (diff)
downloadydb-4e27de679ea7b3c29cea76cdf0f4dd3c9f0f0057.tar.gz
Fix StreamLookupJoin freeze (#21693)
-rw-r--r--.github/config/muted_ya.txt1
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp45
2 files changed, 42 insertions, 4 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 6fc5836624d..372af30ec07 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -36,7 +36,6 @@ ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
ydb/core/kqp/ut/query KqpLimits.StreamWrite+Allowed
ydb/core/kqp/ut/query KqpStats.DeferredEffects+UseSink
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
-ydb/core/kqp/ut/scan KqpSplit.StreamLookupJoinDeliveryProblemAfterFirstResult
ydb/core/kqp/ut/scheme KqpOlapScheme.AddPgColumnWithStore
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme unittest.[*/*] chunk
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 48d3cdf4bcb..5650bf6abe6 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -158,6 +158,7 @@ private:
struct TShardState {
ui64 RetryAttempts = 0;
std::unordered_set<ui64> Reads;
+ bool HasPipe = false;
};
struct TReads {
@@ -222,6 +223,18 @@ private:
return result;
}
+
+ bool NeedToCreatePipe(ui64 shardId) {
+ return !ReadsPerShard[shardId].HasPipe;
+ }
+
+ void SetPipeCreated(ui64 shardId) {
+ ReadsPerShard[shardId].HasPipe = true;
+ }
+
+ void SetPipeDestroyed(ui64 shardId) {
+ ReadsPerShard[shardId].HasPipe = false;
+ }
};
struct TEvPrivate {
@@ -498,9 +511,18 @@ private:
request->Record.SetMaxRows(defaultSettings.GetMaxRows());
request->Record.SetMaxBytes(defaultSettings.GetMaxBytes());
- Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
+ const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId);
+
+ Send(PipeCacheId,
+ new TEvPipeCache::TEvForward(
+ request.Release(), read.ShardId, TEvPipeCache::TEvForwardOptions{
+ .AutoConnect = needToCreatePipe,
+ .Subscribe = needToCreatePipe,
+ }),
IEventHandle::FlagTrackDelivery);
+ Reads.SetPipeCreated(read.ShardId);
+
CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId);
if (auto delay = ShardTimeout()) {
@@ -522,6 +544,8 @@ private:
const auto& tabletId = ev->Get()->TabletId;
+ Reads.SetPipeDestroyed(tabletId);
+
TVector<TReadState*> toRetry;
for (auto* read : Reads.GetShardReads(tabletId)) {
if (read->State == EReadState::Running) {
@@ -560,6 +584,7 @@ private:
if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
if (ev->Get()->InstantStart) {
+ auto guard = BindAllocator();
auto requests = StreamLookupWorker->RebuildRequest(read.Id, ReadId);
for (auto& request : requests) {
StartTableRead(read.ShardId, std::move(request));
@@ -630,8 +655,21 @@ private:
<< ", lockTxId=" << record.GetLockTxId()
<< ", lockNodeId=" << record.GetLockNodeId());
- Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), shardId, true),
- IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId());
+ const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId);
+
+ Send(PipeCacheId,
+ new TEvPipeCache::TEvForward(
+ request.Release(),
+ shardId,
+ TEvPipeCache::TEvForwardOptions{
+ .AutoConnect = needToCreatePipe,
+ .Subscribe = needToCreatePipe,
+ }),
+ IEventHandle::FlagTrackDelivery,
+ 0,
+ LookupActorSpan.GetTraceId());
+
+ Reads.SetPipeCreated(read.ShardId);
read.State = EReadState::Running;
@@ -669,6 +707,7 @@ private:
auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry);
if (delay == TDuration::Zero()) {
+ auto guard = BindAllocator();
auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, ReadId);
for (auto& request : requests) {
StartTableRead(failedRead.ShardId, std::move(request));