aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-01 01:18:33 +0300
committerssmike <ssmike@ydb.tech>2023-03-01 01:18:33 +0300
commit22f99c1e277287a012a9b629919dbc8874d79912 (patch)
tree766d8ccc128eeaa8a0b1d0716b1f069d1c72dabf
parent39783e63c49f52654ac4860ef64681e6dee46806 (diff)
downloadydb-22f99c1e277287a012a9b629919dbc8874d79912.tar.gz
close all streamlookup&source iterators
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp5
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp5
2 files changed, 9 insertions, 1 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 2486ce6ec4f..c6880b9158f 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -1061,7 +1061,7 @@ public:
auto* state = Reads[id].Shard;
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId));
}
void PassAway() override {
@@ -1069,6 +1069,9 @@ public:
auto guard = BindAllocator();
Results.clear();
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
+ for (size_t i = 0; i < Reads.size(); ++i) {
+ SendCancel(i);
+ }
}
TBase::PassAway();
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index f5f888a86c9..39cbd9c3b49 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -182,6 +182,11 @@ private:
{
auto alloc = BindAllocator();
Input.Clear();
+ for (auto& [id, state] : Reads) {
+ auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
+ cancel->Record.SetReadId(id);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId));
+ }
}
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));