diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-11-12 16:35:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 16:35:17 +0300 |
commit | 691458b0c392a9ab6775bb63e81abfe252697370 (patch) | |
tree | fa2f13fb2a8861b60d05cabdd74cda5076d43fd3 | |
parent | f1765fb658be2d0d8e35797ca894c125d1bad507 (diff) | |
download | ydb-691458b0c392a9ab6775bb63e81abfe252697370.tar.gz |
YQ-3844 Shared reading: add restart session sensor (#11510)
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp | 2 | ||||
-rw-r--r-- | ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 6 |
3 files changed, 8 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h index b53f6399a0..15e60b54ce 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h +++ b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h @@ -15,6 +15,7 @@ struct TopicSessionClientStatistic { struct TopicSessionCommonStatistic { ui64 UnreadBytes = 0; + ui64 RestartSessionByOffsets = 0; }; struct TopicSessionParams { diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 90b7f88977..3749fae418 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -484,7 +484,7 @@ TString TRowDispatcher::GetInternalState() { str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId; for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) { str << " / " << actorId << "\n"; - str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n"; + str << " unread bytes " << sessionInfo.Stat.UnreadBytes << " restarts by offsets " << sessionInfo.Stat.RestartSessionByOffsets << "\n"; for (auto& [readActorId, consumer] : sessionInfo.Consumers) { str << " " << consumer->QueryId << " " << readActorId << " unread rows " << consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 33b6a9edb8..7d8c465b9a 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -34,6 +34,7 @@ struct TTopicSessionMetrics { RowsRead = SubGroup->GetCounter("RowsRead", true); InFlySubscribe = SubGroup->GetCounter("InFlySubscribe"); ReconnectRate = SubGroup->GetCounter("ReconnectRate", true); + RestartSessionByOffsets = counters->GetCounter("RestartSessionByOffsets", true); } ~TTopicSessionMetrics() { @@ -45,6 +46,7 @@ struct TTopicSessionMetrics { ::NMonitoring::TDynamicCounters::TCounterPtr RowsRead; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; ::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate; + ::NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsets; }; struct TEvPrivate { @@ -171,6 +173,7 @@ private: THashMap<TString, ui64> FieldsIndexes; NYql::IPqGateway::TPtr PqGateway; TMaybe<TString> ConsumerName; + ui64 RestartSessionByOffsets = 0; public: explicit TTopicSession( @@ -750,6 +753,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { if (ReadSession) { if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) { LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); + Metrics.RestartSessionByOffsets->Inc(); + ++RestartSessionByOffsets; StopReadSession(); } } @@ -917,6 +922,7 @@ void TTopicSession::HandleException(const std::exception& e) { void TTopicSession::SendStatistic() { TopicSessionStatistic stat; stat.Common.UnreadBytes = UnreadBytes; + stat.Common.RestartSessionByOffsets = RestartSessionByOffsets; stat.SessionKey = TopicSessionParams{Endpoint, Database, TopicPath, PartitionId}; stat.Clients.reserve(Clients.size()); for (auto& [readActorId, info] : Clients) { |