aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-24 13:52:49 +0300
committerssmike <ssmike@ydb.tech>2023-03-24 13:52:49 +0300
commit6b09dd4323839241524b076042c4ec1db82e329b (patch)
treef93f7e5900ba0ae8b15601dae0ce736d7c0f62fb
parent9cca84faf002b556c6ea0751432b3a68b707a1ed (diff)
downloadydb-6b09dd4323839241524b076042c4ec1db82e329b.tar.gz
add undelivery metric and cleanup finalize logic
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp1
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp13
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp3
4 files changed, 12 insertions, 6 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 7ae27c2b59f..2fe3213e249 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -777,6 +777,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", true);
DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true);
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
+ IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);
LiteralTxTotalTimeHistogram = KqpGroup->GetHistogram(
"PhyTx/LiteralTxTotalTimeMs", NMonitoring::ExponentialHistogram(10, 2, 1));
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index e0dfa9b8ac2..bfeed50c1d2 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -378,6 +378,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries;
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails;
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
+ ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
// Physical tx duration
NMonitoring::THistogramPtr LiteralTxTotalTimeHistogram;
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 82a005f882b..77a607bc691 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -924,10 +924,13 @@ public:
++ErrorsCount;
TVector<ui32> reads;
- reads.swap(ReadIdByTabletId[msg.TabletId]);
+ reads = ReadIdByTabletId[msg.TabletId];
+ CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
for (auto read : reads) {
- CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
- RetryRead(read, false);
+ if (Reads[read]) {
+ Counters->IteratorDeliveryProblems->Inc();
+ }
+ RetryRead(read);
}
}
@@ -941,7 +944,7 @@ public:
auto* state = Reads[id].Shard;
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
- Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId));
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false));
Reads[id].Reset();
ResetReads++;
@@ -1239,10 +1242,10 @@ public:
{
auto guard = BindAllocator();
Results.clear();
- Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
for (size_t i = 0; i < Reads.size(); ++i) {
ResetRead(i);
}
+ Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
}
TBase::PassAway();
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index c655182b100..eaaf146b9c7 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -196,7 +196,7 @@ private:
Counters->SentIteratorCancels->Inc();
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId));
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId, false));
}
}
@@ -351,6 +351,7 @@ private:
for (auto* read : shardIt->second.Reads) {
if (read->State == EReadState::Running) {
+ Counters->IteratorDeliveryProblems->Inc();
for (auto& key : read->Keys) {
UnprocessedKeys.emplace_back(std::move(key));
}