diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-22 19:22:16 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-22 19:22:16 +0300 |
commit | d6db0fa7fd05eea98ca692ecc6bd29c2b08700c3 (patch) | |
tree | fd098cf49cc5dcde9c534de801294ab565a8cac8 | |
parent | 1e6f361ca50218237c7d536cbba32cdee2e9fb2e (diff) | |
download | ydb-d6db0fa7fd05eea98ca692ecc6bd29c2b08700c3.tar.gz |
cancel open iterators in readactor
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 025fff3e3db..34bc047bbbe 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -605,9 +605,7 @@ public: } CA_LOG_D("Retrying read #" << id); - auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); - cancel->Record.SetReadId(id); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + SendCancel(id); if (Reads[id].SerializedContinuationToken) { NKikimrTxDataShard::TReadContinuationToken token; @@ -929,9 +927,7 @@ public: if (Reads[id].IsLastMessage(msg)) { if (!record.GetFinished()) { - auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); - cancel->Record.SetReadId(id); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + SendCancel(id); } Reads[id].Reset(); ResetReads++; @@ -994,10 +990,21 @@ public: void CommitState(const NYql::NDqProto::TCheckpoint&) override {} void LoadState(const NYql::NDqProto::TSourceState&) override {} + void SendCancel(ui32 id) { + if (!Reads[id]) { + return; + } + auto* state = Reads[id].Shard; + auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); + cancel->Record.SetReadId(id); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + } + void PassAway() override { { auto guard = BindAllocator(); Results.clear(); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); } TBase::PassAway(); } |