aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-21 22:34:34 +0300
committerssmike <ssmike@ydb.tech>2023-03-21 22:34:34 +0300
commit5b12a4c00e39fce0f7b518dafd05e1473d0ce06f (patch)
treede48f1107168514a9d8dbc3a1c8f5d6de11ff199
parentf2900afd402bd86173b9e8e419b3ae70e51c5e18 (diff)
downloadydb-5b12a4c00e39fce0f7b518dafd05e1473d0ce06f.tar.gz
ensure evcancel is sent
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp23
1 files changed, 7 insertions, 16 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 76b8dbacdd..82a005f882 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -738,7 +738,6 @@ public:
}
CA_LOG_D("Retrying read #" << id);
- SendCancel(id);
ResetRead(id);
if (Reads[id].SerializedContinuationToken) {
@@ -938,6 +937,12 @@ public:
void ResetRead(size_t id) {
if (Reads[id]) {
+ Counters->SentIteratorCancels->Inc();
+ auto* state = Reads[id].Shard;
+ auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
+ cancel->Record.SetReadId(id);
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId));
+
Reads[id].Reset();
ResetReads++;
}
@@ -1156,9 +1161,6 @@ public:
}
if (Reads[id].IsLastMessage(msg)) {
- if (!record.GetFinished()) {
- SendCancel(id);
- }
ResetRead(id);
}
@@ -1232,17 +1234,6 @@ public:
void CommitState(const NYql::NDqProto::TCheckpoint&) override {}
void LoadState(const NYql::NDqProto::TSourceState&) override {}
- void SendCancel(ui32 id) {
- if (!Reads[id]) {
- return;
- }
- Counters->SentIteratorCancels->Inc();
- auto* state = Reads[id].Shard;
- auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
- cancel->Record.SetReadId(id);
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId));
- }
-
void PassAway() override {
Counters->ReadActorsCount->Dec();
{
@@ -1250,7 +1241,7 @@ public:
Results.clear();
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
for (size_t i = 0; i < Reads.size(); ++i) {
- SendCancel(i);
+ ResetRead(i);
}
}
TBase::PassAway();