aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-01-27 14:33:14 +0300
committeralexnick <alexnick@ydb.tech>2023-01-27 14:33:14 +0300
commit0c782b4d5eb3a2c8c081916e088d16e01d97e49c (patch)
treee66595074c117e460ac447483a23e3195bff0b55
parent180cad240c3e53366d3dc63f9be43b5eedbd8476 (diff)
downloadydb-0c782b4d5eb3a2c8c081916e088d16e01d97e49c.tar.gz
do not count sensors 6 times
do not count sensors 6 times do not count sensors 6 times
-rw-r--r--ydb/core/persqueue/partition.cpp35
-rw-r--r--ydb/core/persqueue/user_info.h50
-rw-r--r--ydb/core/persqueue/ut/resources/counters_datastreams.html16
-rw-r--r--ydb/library/persqueue/topic_parser/counters.cpp14
-rw-r--r--ydb/library/persqueue/topic_parser/counters.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp20
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp9
7 files changed, 78 insertions, 68 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 4d71e2c6440..50e1a30a3a4 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -878,7 +878,7 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
void TPartition::SetupStreamCounters(const TActorContext& ctx) {
const auto topicName = TopicConverter->GetModernName();
auto counters = AppData(ctx)->Counters;
- auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
+ auto subgroups = NPersQueue::GetSubgroupsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
/*
WriteBufferIsFullCounter.SetCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless),
@@ -892,18 +892,22 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{"partition", ToString<ui32>(Partition)}},
{"name", "api.grpc.topic.stream_write.buffer_brimmed_milliseconds", true});
*/
+
+ subgroups.push_back({"name", "topic.write.lag_milliseconds"});
+
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
- {{"name", "topic.write.lag_milliseconds"}}, "bin",
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
TVector<std::pair<ui64, TString>>{
{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"}, {5000, "5000"},
{10'000, "10000"}, {30'000, "30000"}, {60'000, "60000"},
{180'000,"180000"}, {9'999'999, "999999"}}, true));
+ subgroups.back().second = "topic.write.message_size_bytes";
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
- {{"name", "topic.write.message_size_bytes"}}, "bin",
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
TVector<std::pair<ui64, TString>>{
{1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
{20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"},
@@ -911,24 +915,25 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{2'097'152,"2097152"}, {5'242'880, "5242880"}, {10'485'760, "10485760"},
{67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
+ subgroups.pop_back();
BytesWrittenGrpc = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"api.grpc.topic.stream_write.bytes"} , true, "name");
BytesWrittenTotal = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.bytes"} , true, "name");
MsgsWrittenGrpc = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"api.grpc.topic.stream_write.messages"}, true, "name");
MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.messages"}, true, "name");
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.uncompressed_bytes"}, true, "name");
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
@@ -940,21 +945,23 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
+ subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
- {{"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}}, "bin",
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {},
+ subgroups, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
{20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
{1000, "1000"}, {2500, "2500"}, {5000, "5000"},
{10'000, "10000"}, {9'999'999, "999999"}}, true));
+ subgroups.pop_back();
}
+ subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
- {{"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}}, "bin",
+ NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
{20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 1d4bfe198f5..7184f967cc3 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -305,9 +305,7 @@ struct TUserInfo {
LabeledCounters.Reset(new TUserLabeledCounters(
user + "||" + topicConverter->GetClientsideName(), partition, dbPath));
- if (DoInternalRead) {
- SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId);
- }
+ SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId);
} else {
LabeledCounters.Reset(new TUserLabeledCounters(
user + "/" + (important ? "1" : "0") + "/" + topicConverter->GetClientsideName(),
@@ -323,30 +321,36 @@ struct TUserInfo {
const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId
) {
auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless);
- auto aggregates =
- NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId);
-
- BytesRead = TMultiCounter(subgroup,
- aggregates, {{"consumer", User}},
- {"api.grpc.topic.stream_read.bytes",
- "topic.read.bytes"}, true, "name");
- MsgsRead = TMultiCounter(subgroup,
- aggregates, {{"consumer", User}},
- {"api.grpc.topic.stream_read.messages",
- "topic.read.messages"}, true, "name");
- Y_UNUSED(dcId);
- Y_UNUSED(partition);
+ auto subgroups =
+ NPersQueue::GetSubgroupsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId);
+ if (DoInternalRead) {
+ subgroups.push_back({"consumer", User});
+
+ BytesRead = TMultiCounter(subgroup, {}, subgroups,
+ {"api.grpc.topic.stream_read.bytes",
+ "topic.read.bytes"}, true, "name");
+ MsgsRead = TMultiCounter(subgroup, {}, subgroups,
+ {"api.grpc.topic.stream_read.messages",
+ "topic.read.messages"}, true, "name");
+ } else {
+ BytesRead = TMultiCounter(subgroup, {}, subgroups,
+ {"topic.read.bytes"}, true, "name");
+ MsgsRead = TMultiCounter(subgroup, {}, subgroups,
+ {"topic.read.messages"}, true, "name");
+ }
+ Y_UNUSED(dcId);
+ Y_UNUSED(partition);
/*
- Counter.SetCounter(subgroup,
- {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
- {"topic", TopicConverter->GetFederationPath()},
- {"consumer", User}, {"host", dcId}, {"partition", partition}},
- {"name", "topic.read.awaiting_consume_milliseconds", true});
+ Counter.SetCounter(subgroup,
+ {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
+ {"topic", TopicConverter->GetFederationPath()},
+ {"consumer", User}, {"host", dcId}, {"partition", partition}},
+ {"name", "topic.read.awaiting_consume_milliseconds", true});
*/
+ subgroups.push_back({"name", "topic.read.lag_milliseconds"});
ReadTimeLag.reset(new TPercentileCounter(
- NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), aggregates,
- {{"consumer", User}, {"name", "topic.read.lag_milliseconds"}}, "bin",
+ NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), {}, subgroups, "bin",
TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
{5000, "5000"}, {10'000, "10000"},
diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html
index 64e00643937..bdf68ef1ac4 100644
--- a/ydb/core/persqueue/ut/resources/counters_datastreams.html
+++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html
@@ -1,10 +1,10 @@
<pre>
topic=topic:
- name=api.grpc.topic.stream_write.bytes: 3240
- name=api.grpc.topic.stream_write.messages: 180
- name=topic.write.bytes: 3240
- name=topic.write.messages: 180
- name=topic.write.uncompressed_bytes: 1620
+ name=api.grpc.topic.stream_write.bytes: 540
+ name=api.grpc.topic.stream_write.messages: 30
+ name=topic.write.bytes: 540
+ name=topic.write.messages: 30
+ name=topic.write.uncompressed_bytes: 270
consumer=user:
name=api.grpc.topic.stream_read.bytes: 0
@@ -26,7 +26,7 @@ topic=topic:
bin=999999: 0
name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
- bin=0: 180
+ bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
@@ -41,7 +41,7 @@ topic=topic:
bin=999999: 0
name=topic.write.lag_milliseconds:
- bin=100: 180
+ bin=100: 30
bin=1000: 0
bin=10000: 0
bin=180000: 0
@@ -54,7 +54,7 @@ topic=topic:
bin=999999: 0
name=topic.write.message_size_bytes:
- bin=1024: 180
+ bin=1024: 30
bin=10240: 0
bin=102400: 0
bin=1048576: 0
diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp
index 5f78745bdec..479812b04f0 100644
--- a/ydb/library/persqueue/topic_parser/counters.cpp
+++ b/ydb/library/persqueue/topic_parser/counters.cpp
@@ -37,14 +37,14 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic)
return GetLabelsForCustomCluster(topic, topic->GetCluster());
}
-TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
+TVector<std::pair<TString, TString>> GetSubgroupsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
const TString& dbId, const TString& dbPath, const TString& folderId) {
- TVector<TPQLabelsInfo> res = {
- {{{"database", dbPath}}, {dbPath}},
- {{{"cloud_id", cloudId}}, {cloudId}},
- {{{"folder_id", folderId}}, {folderId}},
- {{{"database_id", dbId}}, {dbId}},
- {{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}};
+ TVector<std::pair<TString, TString>> res = {
+ {"database", dbPath},
+ {"cloud_id", cloudId},
+ {"folder_id", folderId},
+ {"database_id", dbId},
+ {"topic", topic->GetClientsideName()}};
return res;
}
diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h
index 54572875214..8367e10f821 100644
--- a/ydb/library/persqueue/topic_parser/counters.h
+++ b/ydb/library/persqueue/topic_parser/counters.h
@@ -9,7 +9,7 @@ namespace NPersQueue {
TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic);
//TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic);
TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster);
-TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
+TVector<std::pair<TString, TString>> GetSubgroupsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
const TString& dbId, const TString& dbPath,
const TString& folderId);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 0c772d4c8bb..ebe6f3764df 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -813,16 +813,16 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
{
auto& topicCounters = TopicCounters[topic->GetInternalName()];
auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless);
- auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, dbPath, folderId);
- const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
-
- topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.started"}, true, "name");
- topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.stopped"}, true, "name");
- topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.stopping_count"}, false, "name");
- topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.starting_count"}, false, "name");
- topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.count"}, false, "name");
- topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.partition_session.errors"}, true, "name");
- topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.grpc.topic.stream_read.commits"}, true, "name");
+ auto subgroups = NPersQueue::GetSubgroupsForTopic(topic, cloudId, dbId, dbPath, folderId);
+ subgroups.push_back({"consumer", ClientPath});
+
+ topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.started"}, true, "name");
+ topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.stopped"}, true, "name");
+ topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.stopping_count"}, false, "name");
+ topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.starting_count"}, false, "name");
+ topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.count"}, false, "name");
+ topicCounters.Errors = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.partition_session.errors"}, true, "name");
+ topicCounters.Commits = NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_read.commits"}, true, "name");
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 429f329ba3b..0e090790a6f 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -531,12 +531,11 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou
//now topic is checked, can create group for real topic, not garbage
auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless);
- auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, dbPath, folderId);
-
- SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.sessions_created"}, true, "name");
- SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.sessions_active_count"}, false, "name");
- Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.grpc.topic.stream_write.errors"}, true, "name");
+ auto subgroups = NPersQueue::GetSubgroupsForTopic(FullConverter, cloudId, dbId, dbPath, folderId);
+ SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.sessions_created"}, true, "name");
+ SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.sessions_active_count"}, false, "name");
+ Errors = NKikimr::NPQ::TMultiCounter(subGroup, {}, subgroups, {"api.grpc.topic.stream_write.errors"}, true, "name");
SessionsCreated.Inc();
SessionsActive.Inc();