aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-22 19:22:16 +0300
committerssmike <ssmike@ydb.tech>2023-02-22 19:22:16 +0300
commitd6db0fa7fd05eea98ca692ecc6bd29c2b08700c3 (patch)
treefd098cf49cc5dcde9c534de801294ab565a8cac8
parent1e6f361ca50218237c7d536cbba32cdee2e9fb2e (diff)
downloadydb-d6db0fa7fd05eea98ca692ecc6bd29c2b08700c3.tar.gz
cancel open iterators in readactor
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp19
1 files changed, 13 insertions, 6 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 025fff3e3db..34bc047bbbe 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -605,9 +605,7 @@ public:
}
CA_LOG_D("Retrying read #" << id);
- auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
- cancel->Record.SetReadId(id);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ SendCancel(id);
if (Reads[id].SerializedContinuationToken) {
NKikimrTxDataShard::TReadContinuationToken token;
@@ -929,9 +927,7 @@ public:
if (Reads[id].IsLastMessage(msg)) {
if (!record.GetFinished()) {
- auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
- cancel->Record.SetReadId(id);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ SendCancel(id);
}
Reads[id].Reset();
ResetReads++;
@@ -994,10 +990,21 @@ public:
void CommitState(const NYql::NDqProto::TCheckpoint&) override {}
void LoadState(const NYql::NDqProto::TSourceState&) override {}
+ void SendCancel(ui32 id) {
+ if (!Reads[id]) {
+ return;
+ }
+ auto* state = Reads[id].Shard;
+ auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
+ cancel->Record.SetReadId(id);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ }
+
void PassAway() override {
{
auto guard = BindAllocator();
Results.clear();
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
}
TBase::PassAway();
}