diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-24 13:52:49 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-24 13:52:49 +0300 |
commit | 6b09dd4323839241524b076042c4ec1db82e329b (patch) | |
tree | f93f7e5900ba0ae8b15601dae0ce736d7c0f62fb | |
parent | 9cca84faf002b556c6ea0751432b3a68b707a1ed (diff) | |
download | ydb-6b09dd4323839241524b076042c4ec1db82e329b.tar.gz |
add undelivery metric and cleanup finalize logic
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 3 |
4 files changed, 12 insertions, 6 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 7ae27c2b59f..2fe3213e249 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -777,6 +777,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co ReadActorRetries = KqpGroup->GetCounter("IteratorReads/Retries", true); DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true); DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true); + IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true); LiteralTxTotalTimeHistogram = KqpGroup->GetHistogram( "PhyTx/LiteralTxTotalTimeMs", NMonitoring::ExponentialHistogram(10, 2, 1)); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index e0dfa9b8ac2..bfeed50c1d2 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -378,6 +378,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; + ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; // Physical tx duration NMonitoring::THistogramPtr LiteralTxTotalTimeHistogram; diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 82a005f882b..77a607bc691 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -924,10 +924,13 @@ public: ++ErrorsCount; TVector<ui32> reads; - reads.swap(ReadIdByTabletId[msg.TabletId]); + reads = ReadIdByTabletId[msg.TabletId]; + CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered); for (auto read : reads) { - CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered); - RetryRead(read, false); + if (Reads[read]) { + Counters->IteratorDeliveryProblems->Inc(); + } + RetryRead(read); } } @@ -941,7 +944,7 @@ public: auto* state = Reads[id].Shard; auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); - Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId)); + Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId, false)); Reads[id].Reset(); ResetReads++; @@ -1239,10 +1242,10 @@ public: { auto guard = BindAllocator(); Results.clear(); - Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); for (size_t i = 0; i < Reads.size(); ++i) { ResetRead(i); } + Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); } TBase::PassAway(); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index c655182b100..eaaf146b9c7 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -196,7 +196,7 @@ private: Counters->SentIteratorCancels->Inc(); auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId)); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId, false)); } } @@ -351,6 +351,7 @@ private: for (auto* read : shardIt->second.Reads) { if (read->State == EReadState::Running) { + Counters->IteratorDeliveryProblems->Inc(); for (auto& key : read->Keys) { UnprocessedKeys.emplace_back(std::move(key)); } |