aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-12-07 14:33:38 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-12-07 14:33:38 +0300
commit0c53a38909eec7c5872384fa373409c8a9ecd681 (patch)
tree8fb790eb7872b520e48cba5a11ea17268f8dee3e
parent3887283e448f5a9b05c6e3d062e482b84b241f17 (diff)
downloadydb-0c53a38909eec7c5872384fa373409c8a9ecd681.tar.gz
Switch to cloud_id, folder_id, database_id in PQ
Switch to cloud_id, folder_id, database_id in PQ Update metrics names introduce new names replace stream. prefix with topic. prefix introduce topic counters in http proxy alongside stream. counters
-rw-r--r--ydb/core/http_proxy/custom_metrics.h86
-rw-r--r--ydb/core/http_proxy/http_req.cpp33
-rw-r--r--ydb/core/persqueue/partition.cpp25
-rw-r--r--ydb/core/persqueue/user_info.h8
-rw-r--r--ydb/core/persqueue/ut/resources/counters_datastreams.html154
-rw-r--r--ydb/library/persqueue/tests/counters.cpp6
-rw-r--r--ydb/library/persqueue/topic_parser/counters.cpp10
-rw-r--r--ydb/library/persqueue/topic_parser/counters.h6
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp4
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp4
10 files changed, 117 insertions, 219 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h
index df27e091cc..39ddcba5a3 100644
--- a/ydb/core/http_proxy/custom_metrics.h
+++ b/ydb/core/http_proxy/custom_metrics.h
@@ -16,28 +16,20 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte
Y_UNUSED(result, httpContext, ctx);
}
-TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name, bool setTopicPrefix = true) {
- const TString topicPrefix = setTopicPrefix ? "topic" : "stream";
+TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) {
if (method.empty()) {
- return {{"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId},
- {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName},
- {"name", name}};
+ return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId},
+ {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
+ {"topic", httpContext.StreamName}, {"name", name}};
}
- return {{"method", method}, {"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId},
- {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName},
- {"name", name}};
+ return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId},
+ {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
+ {"topic", httpContext.StreamName}, {"name", name}};
}
-static const bool setStreamPrefix{false};
-
template <>
void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) {
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{request.records_size(), true, true,
- BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix)
- });
-
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{request.records_size(), true, true,
BuildLabels("", httpContext, "topic.written_messages_per_second")
@@ -48,19 +40,11 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request,
bytes += rec.data().size() + rec.partition_key().size() + rec.explicit_hash_key().size();
}
- /* deprecated metric */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
BuildLabels("", httpContext, "topic.written_bytes_per_second")
});
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.put_records.bytes_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
BuildLabels("", httpContext, "api.data_streams.put_records.bytes_per_second")
@@ -69,18 +53,10 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request,
template <>
void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) {
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels("", httpContext, "topic.written_messages_per_second")
});
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_record.records_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels("", httpContext, "api.data_streams.put_record.messages_per_second")
@@ -88,19 +64,11 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c
i64 bytes = request.data().size() + request.partition_key().size() + request.explicit_hash_key().size();
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
BuildLabels("", httpContext, "topic.written_bytes_per_second")
});
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.put_record.bytes_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
BuildLabels("", httpContext, "api.data_streams.put_record.bytes_per_second")
@@ -111,10 +79,6 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c
template <>
void FillOutputCustomMetrics<PutRecordResult>(const PutRecordResult& result, const THttpRequestContext& httpContext, const TActorContext& ctx) {
Y_UNUSED(result);
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_record.success_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels("", httpContext, "api.data_streams.put_record.success_per_second")
@@ -127,37 +91,21 @@ void FillOutputCustomMetrics<PutRecordsResult>(const PutRecordsResult& result, c
i64 failed = result.failed_record_count();
i64 success = result.records_size() - failed;
if (success > 0) {
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_records.success_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels("", httpContext, "api.data_streams.put_records.success_per_second")
});
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{success, true, true,
- BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{success, true, true,
BuildLabels("", httpContext, "api.data_streams.put_records.successfull_messages_per_second")
});
}
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{result.records_size(), true, true,
- BuildLabels("", httpContext, "stream.put_records.total_records_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{result.records_size(), true, true,
BuildLabels("", httpContext, "api.data_streams.put_records.total_messages_per_second")
});
if (failed > 0) {
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{failed, true, true,
- BuildLabels("", httpContext, "streams.put_records.failed_records_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{failed, true, true,
BuildLabels("", httpContext, "api.data_streams.put_records.failed_messages_per_second")
@@ -178,26 +126,6 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c
;
});
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.get_records.success_per_second", setStreamPrefix)}
- );
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{records_n, true, true,
- BuildLabels("", httpContext, "stream.get_records.records_per_second", setStreamPrefix)}
- );
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.get_records.bytes_per_second", setStreamPrefix)}
- );
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{records_n, true, true,
- BuildLabels("", httpContext, "stream.outgoing_records_per_second", setStreamPrefix)}
- );
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.outgoing_bytes_per_second", setStreamPrefix)}
- );
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels("", httpContext, "api.data_streams.get_records.success_per_second")}
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index ab90864659..269bd9a582 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -392,25 +392,14 @@ namespace NKikimr::NHttpProxy {
}
void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) {
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{
- 1, true, true,
- {{"method", Method},
- {"cloud", HttpContext.CloudId},
- {"folder", HttpContext.FolderId},
- {"database", HttpContext.DatabaseId},
- {"stream", HttpContext.StreamName},
- {"code", TStringBuilder() << (int)StatusToHttpCode(status)},
- {"name", "api.http.errors_per_second"}}
- });
-
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
- {{"method", Method},
- {"cloud", HttpContext.CloudId},
- {"folder", HttpContext.FolderId},
- {"database", HttpContext.DatabaseId},
+ {{"database", HttpContext.DatabaseName},
+ {"method", Method},
+ {"cloud_id", HttpContext.CloudId},
+ {"folder_id", HttpContext.FolderId},
+ {"database_id", HttpContext.DatabaseId},
{"topic", HttpContext.StreamName},
{"code", TStringBuilder() << (int)StatusToHttpCode(status)},
{"name", "api.data_streams.errors_per_second"}}
@@ -443,10 +432,6 @@ namespace NKikimr::NHttpProxy {
}
FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx);
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels(Method, HttpContext, "api.data_streams.requests_per_second")
@@ -460,10 +445,6 @@ namespace NKikimr::NHttpProxy {
void ReportLatencyCounters(const TActorContext& ctx) {
TDuration dur = ctx.Now() - StartTime;
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
- BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
BuildLabels(Method, HttpContext, "api.data_streams.requests_duration_milliseconds")
@@ -478,10 +459,6 @@ namespace NKikimr::NHttpProxy {
*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
ReportLatencyCounters(ctx);
- /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
- new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix)
- });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
BuildLabels(Method, HttpContext, "api.data_streams.success_per_second")
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 9ae32f6cec..40a7b6f535 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -851,20 +851,20 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
void TPartition::SetupStreamCounters(const TActorContext& ctx) {
const auto topicName = TopicConverter->GetModernName();
auto counters = AppData(ctx)->Counters;
- auto labels = NPersQueue::GetLabelsForStream(TopicConverter, CloudId, DbId, FolderId);
+ auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, FolderId);
WriteBufferIsFullCounter.SetCounter(
- NPersQueue::GetCountersForStream(counters),
- {{"cloud", CloudId},
- {"folder", FolderId},
- {"database", DbId},
+ NPersQueue::GetCountersForDataStream(counters),
+ {{"cloud_id", CloudId},
+ {"folder_id", FolderId},
+ {"database_id", DbId},
{"topic", TopicConverter->GetFederationPath()},
{"host", DCId},
{"partition", ToString<ui32>(Partition)}},
{"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true});
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForStream(counters), labels,
+ NPersQueue::GetCountersForDataStream(counters), labels,
{{"name", "topic.write_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{100, "100"}, {200, "200"}, {500, "500"},
@@ -873,7 +873,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{180'000,"180000"}, {9'999'999, "999999"}}, true));
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForStream(counters), labels,
+ NPersQueue::GetCountersForDataStream(counters), labels,
{{"name", "topic.written_message_size_bytes"}}, "bin",
TVector<std::pair<ui64, TString>>{
{1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
@@ -883,18 +883,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
BytesWritten = NKikimr::NPQ::TMultiCounter(
-
- NPersQueue::GetCountersForStream(counters), labels, {},
+ NPersQueue::GetCountersForDataStream(counters), labels, {},
{"api.topic_service.stream_write.bytes_per_second",
"topic.written_bytes_per_second"} , true, "name");
MsgsWritten = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForStream(counters), labels, {},
+ NPersQueue::GetCountersForDataStream(counters), labels, {},
{"api.topic_service.stream_write.messages_per_second",
"topic.written_messages_per_second"}, true, "name");
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForStream(counters), labels, {},
+ NPersQueue::GetCountersForDataStream(counters), labels, {},
{"topic.written_uncompressed_bytes_per_second"}, true, "name");
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
@@ -908,7 +907,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForStream(counters), labels,
+ NPersQueue::GetCountersForDataStream(counters), labels,
{{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
@@ -919,7 +918,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForStream(counters), labels,
+ NPersQueue::GetCountersForDataStream(counters), labels,
{{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index c9993792bd..d9559779c9 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -322,9 +322,9 @@ struct TUserInfo {
const TActorContext& ctx, const TString& dcId, const TString& partition,
const TString& cloudId, const TString& dbId, const TString& folderId
) {
- auto subgroup = NPersQueue::GetCountersForStream(AppData(ctx)->Counters);
+ auto subgroup = NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters);
auto aggregates =
- NPersQueue::GetLabelsForStream(TopicConverter, cloudId, dbId, folderId);
+ NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, folderId);
BytesRead = TMultiCounter(subgroup,
aggregates, {{"consumer", User}},
@@ -336,13 +336,13 @@ struct TUserInfo {
"topic.read_messages_per_second"}, true, "name");
Counter.SetCounter(subgroup,
- {{"cloud", cloudId}, {"folder", folderId}, {"database", dbId},
+ {{"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
{"topic", TopicConverter->GetFederationPath()},
{"consumer", User}, {"host", dcId}, {"partition", partition}},
{"name", "topic.awaiting_consume_milliseconds", true});
ReadTimeLag.reset(new TPercentileCounter(
- NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates,
+ NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters), aggregates,
{{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html
index b52e418d24..7c39ad45a5 100644
--- a/ydb/core/persqueue/ut/resources/counters_datastreams.html
+++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html
@@ -1,93 +1,87 @@
<pre>
-cloud=somecloud:
+topic=topic:
+ name=api.topic_service.stream_write.bytes_per_second: 2700
+ name=api.topic_service.stream_write.messages_per_second: 150
+ name=topic.written_bytes_per_second: 2700
+ name=topic.written_messages_per_second: 150
+ name=topic.written_uncompressed_bytes_per_second: 1350
- folder=somefolder:
+ consumer=user:
+ name=api.topic_service.stream_read.bytes_per_second: 0
+ name=api.topic_service.stream_read.messages_per_second: 0
+ name=topic.read_bytes_per_second: 0
+ name=topic.read_messages_per_second: 0
- database=PQ:
+ host=1:
- topic=topic:
- name=api.topic_service.stream_write.bytes_per_second: 2700
- name=api.topic_service.stream_write.messages_per_second: 150
- name=topic.written_bytes_per_second: 2700
- name=topic.written_messages_per_second: 150
- name=topic.written_uncompressed_bytes_per_second: 1350
+ partition=0:
+ name=topic.awaiting_consume_milliseconds: 0
- consumer=user:
- name=api.topic_service.stream_read.bytes_per_second: 0
- name=api.topic_service.stream_read.messages_per_second: 0
- name=topic.read_bytes_per_second: 0
- name=topic.read_messages_per_second: 0
+ partition=1:
+ name=topic.awaiting_consume_milliseconds: 0
- host=1:
+ name=topic.read_lag_milliseconds:
+ bin=100: 0
+ bin=1000: 0
+ bin=10000: 0
+ bin=180000: 0
+ bin=200: 0
+ bin=2000: 0
+ bin=30000: 0
+ bin=500: 0
+ bin=5000: 0
+ bin=60000: 0
+ bin=999999: 0
- partition=0:
- name=topic.awaiting_consume_milliseconds: 0
+ host=1:
- partition=1:
- name=topic.awaiting_consume_milliseconds: 0
+ partition=0:
+ name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
- name=topic.read_lag_milliseconds:
- bin=100: 0
- bin=1000: 0
- bin=10000: 0
- bin=180000: 0
- bin=200: 0
- bin=2000: 0
- bin=30000: 0
- bin=500: 0
- bin=5000: 0
- bin=60000: 0
- bin=999999: 0
+ partition=1:
+ name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
- host=1:
+ name=api.topic_service.stream_write.partition_throttled_milliseconds:
+ bin=0: 150
+ bin=1: 0
+ bin=10: 0
+ bin=100: 0
+ bin=1000: 0
+ bin=10000: 0
+ bin=20: 0
+ bin=2500: 0
+ bin=5: 0
+ bin=50: 0
+ bin=500: 0
+ bin=5000: 0
+ bin=999999: 0
- partition=0:
- name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
+ name=topic.write_lag_milliseconds:
+ bin=100: 150
+ bin=1000: 0
+ bin=10000: 0
+ bin=180000: 0
+ bin=200: 0
+ bin=2000: 0
+ bin=30000: 0
+ bin=500: 0
+ bin=5000: 0
+ bin=60000: 0
+ bin=999999: 0
- partition=1:
- name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
-
- name=api.topic_service.stream_write.partition_throttled_milliseconds:
- bin=0: 150
- bin=1: 0
- bin=10: 0
- bin=100: 0
- bin=1000: 0
- bin=10000: 0
- bin=20: 0
- bin=2500: 0
- bin=5: 0
- bin=50: 0
- bin=500: 0
- bin=5000: 0
- bin=999999: 0
-
- name=topic.write_lag_milliseconds:
- bin=100: 150
- bin=1000: 0
- bin=10000: 0
- bin=180000: 0
- bin=200: 0
- bin=2000: 0
- bin=30000: 0
- bin=500: 0
- bin=5000: 0
- bin=60000: 0
- bin=999999: 0
-
- name=topic.written_message_size_bytes:
- bin=1024: 150
- bin=10240: 0
- bin=102400: 0
- bin=1048576: 0
- bin=10485760: 0
- bin=20480: 0
- bin=204800: 0
- bin=2097152: 0
- bin=5120: 0
- bin=51200: 0
- bin=524288: 0
- bin=5242880: 0
- bin=67108864: 0
- bin=99999999: 0
+ name=topic.written_message_size_bytes:
+ bin=1024: 150
+ bin=10240: 0
+ bin=102400: 0
+ bin=1048576: 0
+ bin=10485760: 0
+ bin=20480: 0
+ bin=204800: 0
+ bin=2097152: 0
+ bin=5120: 0
+ bin=51200: 0
+ bin=524288: 0
+ bin=5242880: 0
+ bin=67108864: 0
+ bin=99999999: 0
</pre>
diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp
index 28600f0f73..2cddb6a157 100644
--- a/ydb/library/persqueue/tests/counters.cpp
+++ b/ydb/library/persqueue/tests/counters.cpp
@@ -96,9 +96,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
TStringBuilder queryBuilder;
queryBuilder <<
"/counters/counters=" << counters <<
- "/cloud=" << cloudId <<
- "/folder=" << folderId <<
- "/database=" << databaseId <<
+ "/cloud_id=" << cloudId <<
+ "/folder_id=" << folderId <<
+ "/database_id=" << databaseId <<
"/topic=" << JoinRange("%2F", pathItems.begin(), pathItems.end());
if (consumer) {
diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp
index 695d1c7a66..ded7dbf7a8 100644
--- a/ydb/library/persqueue/topic_parser/counters.cpp
+++ b/ydb/library/persqueue/topic_parser/counters.cpp
@@ -15,7 +15,7 @@ namespace NPersQueue {
->GetSubgroup("Topic", topic->GetShortClientsideName());
}
-::NMonitoring::TDynamicCounterPtr GetCountersForStream(::NMonitoring::TDynamicCounterPtr counters)
+::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters)
{
return counters->GetSubgroup("counters", "datastreams");
}
@@ -37,12 +37,12 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic)
return GetLabelsForCustomCluster(topic, topic->GetCluster());
}
-TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId,
+TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
const TString& dbId, const TString& folderId) {
TVector<TPQLabelsInfo> res = {
- {{{"cloud", cloudId}}, {cloudId}},
- {{{"folder", folderId}}, {folderId}},
- {{{"database", dbId}}, {dbId}},
+ {{{"cloud_id", cloudId}}, {cloudId}},
+ {{{"folder_id", folderId}}, {folderId}},
+ {{{"database_id", dbId}}, {dbId}},
{{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}};
return res;
}
diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h
index 32af523141..435fb61d88 100644
--- a/ydb/library/persqueue/topic_parser/counters.h
+++ b/ydb/library/persqueue/topic_parser/counters.h
@@ -9,11 +9,11 @@ namespace NPersQueue {
TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic);
//TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic);
TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster);
-TVector<NPersQueue::TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId,
- const TString& dbId, const TString& folderId);
+TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
+ const TString& dbId, const TString& folderId);
::NMonitoring::TDynamicCounterPtr GetCounters(::NMonitoring::TDynamicCounterPtr counters,
const TString& subsystem,
const TTopicConverterPtr& topic);
-::NMonitoring::TDynamicCounterPtr GetCountersForStream(::NMonitoring::TDynamicCounterPtr counters);
+::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters);
} // namespace NPersQueue
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 3df6f98827..525678c3d7 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -812,8 +812,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
const TString& cloudId, const TString& dbId, const TString& folderId)
{
auto& topicCounters = TopicCounters[topic->GetInternalName()];
- auto subGroup = NPersQueue::GetCountersForStream(Counters);
- auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId);
+ auto subGroup = NPersQueue::GetCountersForDataStream(Counters);
+ auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, folderId);
const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name");
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 1283a3c2f5..4387a001f8 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -521,8 +521,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou
}
//now topic is checked, can create group for real topic, not garbage
- auto subGroup = NPersQueue::GetCountersForStream(Counters);
- auto aggr = NPersQueue::GetLabelsForStream(FullConverter, cloudId, dbId, folderId);
+ auto subGroup = NPersQueue::GetCountersForDataStream(Counters);
+ auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, folderId);
SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name");
SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name");