diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-21 22:34:34 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-21 22:34:34 +0300 |
commit | 5b12a4c00e39fce0f7b518dafd05e1473d0ce06f (patch) | |
tree | de48f1107168514a9d8dbc3a1c8f5d6de11ff199 | |
parent | f2900afd402bd86173b9e8e419b3ae70e51c5e18 (diff) | |
download | ydb-5b12a4c00e39fce0f7b518dafd05e1473d0ce06f.tar.gz |
ensure evcancel is sent
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 23 |
1 files changed, 7 insertions, 16 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 76b8dbacdd..82a005f882 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -738,7 +738,6 @@ public: } CA_LOG_D("Retrying read #" << id); - SendCancel(id); ResetRead(id); if (Reads[id].SerializedContinuationToken) { @@ -938,6 +937,12 @@ public: void ResetRead(size_t id) { if (Reads[id]) { + Counters->SentIteratorCancels->Inc(); + auto* state = Reads[id].Shard; + auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); + cancel->Record.SetReadId(id); + Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId)); + Reads[id].Reset(); ResetReads++; } @@ -1156,9 +1161,6 @@ public: } if (Reads[id].IsLastMessage(msg)) { - if (!record.GetFinished()) { - SendCancel(id); - } ResetRead(id); } @@ -1232,17 +1234,6 @@ public: void CommitState(const NYql::NDqProto::TCheckpoint&) override {} void LoadState(const NYql::NDqProto::TSourceState&) override {} - void SendCancel(ui32 id) { - if (!Reads[id]) { - return; - } - Counters->SentIteratorCancels->Inc(); - auto* state = Reads[id].Shard; - auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); - cancel->Record.SetReadId(id); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId)); - } - void PassAway() override { Counters->ReadActorsCount->Dec(); { @@ -1250,7 +1241,7 @@ public: Results.clear(); Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); for (size_t i = 0; i < Reads.size(); ++i) { - SendCancel(i); + ResetRead(i); } } TBase::PassAway(); |