diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-01 01:18:33 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-01 01:18:33 +0300 |
commit | 22f99c1e277287a012a9b629919dbc8874d79912 (patch) | |
tree | 766d8ccc128eeaa8a0b1d0716b1f069d1c72dabf | |
parent | 39783e63c49f52654ac4860ef64681e6dee46806 (diff) | |
download | ydb-22f99c1e277287a012a9b629919dbc8874d79912.tar.gz |
close all streamlookup&source iterators
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 5 |
2 files changed, 9 insertions, 1 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2486ce6ec4f..c6880b9158f 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1061,7 +1061,7 @@ public: auto* state = Reads[id].Shard; auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId)); } void PassAway() override { @@ -1069,6 +1069,9 @@ public: auto guard = BindAllocator(); Results.clear(); Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); + for (size_t i = 0; i < Reads.size(); ++i) { + SendCancel(i); + } } TBase::PassAway(); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index f5f888a86c9..39cbd9c3b49 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -182,6 +182,11 @@ private: { auto alloc = BindAllocator(); Input.Clear(); + for (auto& [id, state] : Reads) { + auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); + cancel->Record.SetReadId(id); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId)); + } } Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); |