diff options
author | alexnick <alexnick@ydb.tech> | 2023-01-27 14:33:14 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-01-27 14:33:14 +0300 |
commit | 0c782b4d5eb3a2c8c081916e088d16e01d97e49c (patch) | |
tree | e66595074c117e460ac447483a23e3195bff0b55 | |
parent | 180cad240c3e53366d3dc63f9be43b5eedbd8476 (diff) | |
download | ydb-0c782b4d5eb3a2c8c081916e088d16e01d97e49c.tar.gz |
do not count sensors 6 times
do not count sensors 6 times
do not count sensors 6 times
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 35 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 50 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_datastreams.html | 16 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.cpp | 14 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 20 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 9 |
7 files changed, 78 insertions, 68 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 4d71e2c6440..50e1a30a3a4 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -878,7 +878,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { void TPartition::SetupStreamCounters(const TActorContext& ctx) { const auto topicName = TopicConverter->GetModernName(); auto counters = AppData(ctx)->Counters; - auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); + auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); /* WriteBufferIsFullCounter.SetCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), @@ -892,18 +892,22 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {"partition", ToString<ui32>(Partition)}}, {"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true}); */ + + subgroups.push_back({"name", "topic.write.lag_milliseconds"}); + InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, - {{"name", "topic.write.lag_milliseconds"}}, "bin", + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", TVector<std::pair<ui64, TString>>{ {100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"}, {180'000,"180000"}, {9'999'999, "999999"}}, true)); + subgroups.back().second = "topic.write.message_size_bytes"; MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, - {{"name", "topic.write.message_size_bytes"}}, "bin", + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", TVector<std::pair<ui64, TString>>{ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"}, @@ -911,24 +915,25 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"}, {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); + subgroups.pop_back(); BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"api.grpc.topic.stream_write.bytes"} , true, "name"); BytesWrittenTotal = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"topic.write.bytes"} , true, "name"); MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"api.grpc.topic.stream_write.messages"}, true, "name"); MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"topic.write.messages"}, true, "name"); BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"topic.write.uncompressed_bytes"}, true, "name"); TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; @@ -940,21 +945,23 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { + subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, - {{"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}}, "bin", + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, + subgroups, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, {1000, "1000"}, {2500, "2500"}, {5000, "5000"}, {10'000, "10000"}, {9'999'999, "999999"}}, true)); + subgroups.pop_back(); } + subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForTopic(counters, IsServerless), labels, - {{"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}}, "bin", + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 1d4bfe198f5..7184f967cc3 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -305,9 +305,7 @@ struct TUserInfo { LabeledCounters.Reset(new TUserLabeledCounters( user + "||" + topicConverter->GetClientsideName(), partition, dbPath)); - if (DoInternalRead) { - SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId); - } + SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId); } else { LabeledCounters.Reset(new TUserLabeledCounters( user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(), @@ -323,30 +321,36 @@ struct TUserInfo { const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId ) { auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless); - auto aggregates = - NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId); - - BytesRead = TMultiCounter(subgroup, - aggregates, {{"consumer", User}}, - {"api.grpc.topic.stream_read.bytes", - "topic.read.bytes"}, true, "name"); - MsgsRead = TMultiCounter(subgroup, - aggregates, {{"consumer", User}}, - {"api.grpc.topic.stream_read.messages", - "topic.read.messages"}, true, "name"); - Y_UNUSED(dcId); - Y_UNUSED(partition); + auto subgroups = + NPersQueue::GetSubgroupsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId); + if (DoInternalRead) { + subgroups.push_back({"consumer", User}); + + BytesRead = TMultiCounter(subgroup, {}, subgroups, + {"api.grpc.topic.stream_read.bytes", + "topic.read.bytes"}, true, "name"); + MsgsRead = TMultiCounter(subgroup, {}, subgroups, + {"api.grpc.topic.stream_read.messages", + "topic.read.messages"}, true, "name"); + } else { + BytesRead = TMultiCounter(subgroup, {}, subgroups, + {"topic.read.bytes"}, true, "name"); + MsgsRead = TMultiCounter(subgroup, {}, subgroups, + {"topic.read.messages"}, true, "name"); + } + Y_UNUSED(dcId); + Y_UNUSED(partition); /* - Counter.SetCounter(subgroup, - {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, - {"topic", TopicConverter->GetFederationPath()}, - {"consumer", User}, {"host", dcId}, {"partition", partition}}, - {"name", "topic.read.awaiting_consume_milliseconds", true}); + Counter.SetCounter(subgroup, + {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, + {"topic", TopicConverter->GetFederationPath()}, + {"consumer", User}, {"host", dcId}, {"partition", partition}}, + {"name", "topic.read.awaiting_consume_milliseconds", true}); */ + subgroups.push_back({"name", "topic.read.lag_milliseconds"}); ReadTimeLag.reset(new TPercentileCounter( - NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), aggregates, - {{"consumer", User}, {"name", "topic.read.lag_milliseconds"}}, "bin", + NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), {}, subgroups, "bin", TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index 64e00643937..bdf68ef1ac4 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -1,10 +1,10 @@ <pre> topic=topic: - name=api.grpc.topic.stream_write.bytes: 3240 - name=api.grpc.topic.stream_write.messages: 180 - name=topic.write.bytes: 3240 - name=topic.write.messages: 180 - name=topic.write.uncompressed_bytes: 1620 + name=api.grpc.topic.stream_write.bytes: 540 + name=api.grpc.topic.stream_write.messages: 30 + name=topic.write.bytes: 540 + name=topic.write.messages: 30 + name=topic.write.uncompressed_bytes: 270 consumer=user: name=api.grpc.topic.stream_read.bytes: 0 @@ -26,7 +26,7 @@ topic=topic: bin=999999: 0 name=api.grpc.topic.stream_write.partition_throttled_milliseconds: - bin=0: 180 + bin=0: 30 bin=1: 0 bin=10: 0 bin=100: 0 @@ -41,7 +41,7 @@ topic=topic: bin=999999: 0 name=topic.write.lag_milliseconds: - bin=100: 180 + bin=100: 30 bin=1000: 0 bin=10000: 0 bin=180000: 0 @@ -54,7 +54,7 @@ topic=topic: bin=999999: 0 name=topic.write.message_size_bytes: - bin=1024: 180 + bin=1024: 30 bin=10240: 0 bin=102400: 0 bin=1048576: 0 diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp index 5f78745bdec..479812b04f0 100644 --- a/ydb/library/persqueue/topic_parser/counters.cpp +++ b/ydb/library/persqueue/topic_parser/counters.cpp @@ -37,14 +37,14 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic) return GetLabelsForCustomCluster(topic, topic->GetCluster()); } -TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, +TVector<std::pair<TString, TString>> GetSubgroupsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, const TString& dbPath, const TString& folderId) { - TVector<TPQLabelsInfo> res = { - {{{"database", dbPath}}, {dbPath}}, - {{{"cloud_id", cloudId}}, {cloudId}}, - {{{"folder_id", folderId}}, {folderId}}, - {{{"database_id", dbId}}, {dbId}}, - {{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}}; + TVector<std::pair<TString, TString>> res = { + {"database", dbPath}, + {"cloud_id", cloudId}, + {"folder_id", folderId}, + {"database_id", dbId}, + {"topic", topic->GetClientsideName()}}; return res; } diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h index 54572875214..8367e10f821 100644 --- a/ydb/library/persqueue/topic_parser/counters.h +++ b/ydb/library/persqueue/topic_parser/counters.h @@ -9,7 +9,7 @@ namespace NPersQueue { TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic); //TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic); TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster); -TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, +TVector<std::pair<TString, TString>> GetSubgroupsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, const TString& dbPath, const TString& folderId); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 0c772d4c8bb..ebe6f3764df 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -813,16 +813,16 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu { auto& topicCounters = TopicCounters[topic->GetInternalName()]; auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless); - auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, dbPath, folderId); - const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; - - topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.started"}, true, "name"); - topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.stopped"}, true, "name"); - topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.stopping_count"}, false, "name"); - topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.starting_count"}, false, "name"); - topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.count"}, false, "name"); - topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.errors"}, true, "name"); - topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.commits"}, true, "name"); + auto subgroups = NPersQueue::GetSubgroupsForTopic(topic, cloudId, dbId, dbPath, folderId); + subgroups.push_back({"consumer", ClientPath}); + + topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.started"}, true, "name"); + topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.stopped"}, true, "name"); + topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.stopping_count"}, false, "name"); + topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.starting_count"}, false, "name"); + topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.count"}, false, "name"); + topicCounters.Errors = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.errors"}, true, "name"); + topicCounters.Commits = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.commits"}, true, "name"); topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 429f329ba3b..0e090790a6f 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -531,12 +531,11 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou //now topic is checked, can create group for real topic, not garbage auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless); - auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, dbPath, folderId); - - SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.sessions_created"}, true, "name"); - SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.sessions_active_count"}, false, "name"); - Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.errors"}, true, "name"); + auto subgroups = NPersQueue::GetSubgroupsForTopic(FullConverter, cloudId, dbId, dbPath, folderId); + SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.sessions_created"}, true, "name"); + SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.sessions_active_count"}, false, "name"); + Errors = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.errors"}, true, "name"); SessionsCreated.Inc(); SessionsActive.Inc(); |