aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-11-12 16:35:17 +0300
committerGitHub <noreply@github.com>2024-11-12 16:35:17 +0300
commit691458b0c392a9ab6775bb63e81abfe252697370 (patch)
treefa2f13fb2a8861b60d05cabdd74cda5076d43fd3
parentf1765fb658be2d0d8e35797ca894c125d1bad507 (diff)
downloadydb-691458b0c392a9ab6775bb63e81abfe252697370.tar.gz
YQ-3844 Shared reading: add restart session sensor (#11510)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp6
3 files changed, 8 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
index b53f6399a0..15e60b54ce 100644
--- a/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
+++ b/ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
@@ -15,6 +15,7 @@ struct TopicSessionClientStatistic {
struct TopicSessionCommonStatistic {
ui64 UnreadBytes = 0;
+ ui64 RestartSessionByOffsets = 0;
};
struct TopicSessionParams {
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 90b7f88977..3749fae418 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -484,7 +484,7 @@ TString TRowDispatcher::GetInternalState() {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
str << " / " << actorId << "\n";
- str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
+ str << " unread bytes " << sessionInfo.Stat.UnreadBytes << " restarts by offsets " << sessionInfo.Stat.RestartSessionByOffsets << "\n";
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index 33b6a9edb8..7d8c465b9a 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -34,6 +34,7 @@ struct TTopicSessionMetrics {
RowsRead = SubGroup->GetCounter("RowsRead", true);
InFlySubscribe = SubGroup->GetCounter("InFlySubscribe");
ReconnectRate = SubGroup->GetCounter("ReconnectRate", true);
+ RestartSessionByOffsets = counters->GetCounter("RestartSessionByOffsets", true);
}
~TTopicSessionMetrics() {
@@ -45,6 +46,7 @@ struct TTopicSessionMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr RowsRead;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsets;
};
struct TEvPrivate {
@@ -171,6 +173,7 @@ private:
THashMap<TString, ui64> FieldsIndexes;
NYql::IPqGateway::TPtr PqGateway;
TMaybe<TString> ConsumerName;
+ ui64 RestartSessionByOffsets = 0;
public:
explicit TTopicSession(
@@ -750,6 +753,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
if (ReadSession) {
if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) {
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
+ Metrics.RestartSessionByOffsets->Inc();
+ ++RestartSessionByOffsets;
StopReadSession();
}
}
@@ -917,6 +922,7 @@ void TTopicSession::HandleException(const std::exception& e) {
void TTopicSession::SendStatistic() {
TopicSessionStatistic stat;
stat.Common.UnreadBytes = UnreadBytes;
+ stat.Common.RestartSessionByOffsets = RestartSessionByOffsets;
stat.SessionKey = TopicSessionParams{Endpoint, Database, TopicPath, PartitionId};
stat.Clients.reserve(Clients.size());
for (auto& [readActorId, info] : Clients) {