diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-12-07 14:33:38 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-12-07 14:33:38 +0300 |
commit | 0c53a38909eec7c5872384fa373409c8a9ecd681 (patch) | |
tree | 8fb790eb7872b520e48cba5a11ea17268f8dee3e | |
parent | 3887283e448f5a9b05c6e3d062e482b84b241f17 (diff) | |
download | ydb-0c53a38909eec7c5872384fa373409c8a9ecd681.tar.gz |
Switch to cloud_id, folder_id, database_id in PQ
Switch to cloud_id, folder_id, database_id in PQ
Update metrics names
introduce new names
replace stream. prefix with topic. prefix
introduce topic counters in http proxy alongside stream. counters
-rw-r--r-- | ydb/core/http_proxy/custom_metrics.h | 86 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 33 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 25 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 8 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_datastreams.html | 154 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.cpp | 6 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.cpp | 10 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.h | 6 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 4 |
10 files changed, 117 insertions, 219 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h index df27e091cc..39ddcba5a3 100644 --- a/ydb/core/http_proxy/custom_metrics.h +++ b/ydb/core/http_proxy/custom_metrics.h @@ -16,28 +16,20 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte Y_UNUSED(result, httpContext, ctx); } -TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name, bool setTopicPrefix = true) { - const TString topicPrefix = setTopicPrefix ? "topic" : "stream"; +TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) { if (method.empty()) { - return {{"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId}, - {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName}, - {"name", name}}; + return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId}, + {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, + {"topic", httpContext.StreamName}, {"name", name}}; } - return {{"method", method}, {"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId}, - {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName}, - {"name", name}}; + return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId}, + {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, + {"topic", httpContext.StreamName}, {"name", name}}; } -static const bool setStreamPrefix{false}; - template <> void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) { - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{request.records_size(), true, true, - BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix) - }); - ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{request.records_size(), true, true, BuildLabels("", httpContext, "topic.written_messages_per_second") @@ -48,19 +40,11 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, bytes += rec.data().size() + rec.partition_key().size() + rec.explicit_hash_key().size(); } - /* deprecated metric */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, BuildLabels("", httpContext, "topic.written_bytes_per_second") }); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.put_records.bytes_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, BuildLabels("", httpContext, "api.data_streams.put_records.bytes_per_second") @@ -69,18 +53,10 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, template <> void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) { - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels("", httpContext, "topic.written_messages_per_second") }); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_record.records_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels("", httpContext, "api.data_streams.put_record.messages_per_second") @@ -88,19 +64,11 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c i64 bytes = request.data().size() + request.partition_key().size() + request.explicit_hash_key().size(); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, BuildLabels("", httpContext, "topic.written_bytes_per_second") }); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.put_record.bytes_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, BuildLabels("", httpContext, "api.data_streams.put_record.bytes_per_second") @@ -111,10 +79,6 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c template <> void FillOutputCustomMetrics<PutRecordResult>(const PutRecordResult& result, const THttpRequestContext& httpContext, const TActorContext& ctx) { Y_UNUSED(result); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_record.success_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels("", httpContext, "api.data_streams.put_record.success_per_second") @@ -127,37 +91,21 @@ void FillOutputCustomMetrics<PutRecordsResult>(const PutRecordsResult& result, c i64 failed = result.failed_record_count(); i64 success = result.records_size() - failed; if (success > 0) { - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_records.success_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels("", httpContext, "api.data_streams.put_records.success_per_second") }); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{success, true, true, - BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{success, true, true, BuildLabels("", httpContext, "api.data_streams.put_records.successfull_messages_per_second") }); } - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{result.records_size(), true, true, - BuildLabels("", httpContext, "stream.put_records.total_records_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{result.records_size(), true, true, BuildLabels("", httpContext, "api.data_streams.put_records.total_messages_per_second") }); if (failed > 0) { - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{failed, true, true, - BuildLabels("", httpContext, "streams.put_records.failed_records_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{failed, true, true, BuildLabels("", httpContext, "api.data_streams.put_records.failed_messages_per_second") @@ -178,26 +126,6 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c ; }); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.get_records.success_per_second", setStreamPrefix)} - ); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{records_n, true, true, - BuildLabels("", httpContext, "stream.get_records.records_per_second", setStreamPrefix)} - ); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.get_records.bytes_per_second", setStreamPrefix)} - ); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{records_n, true, true, - BuildLabels("", httpContext, "stream.outgoing_records_per_second", setStreamPrefix)} - ); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.outgoing_bytes_per_second", setStreamPrefix)} - ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels("", httpContext, "api.data_streams.get_records.success_per_second")} diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index ab90864659..269bd9a582 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -392,25 +392,14 @@ namespace NKikimr::NHttpProxy { } void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) { - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{ - 1, true, true, - {{"method", Method}, - {"cloud", HttpContext.CloudId}, - {"folder", HttpContext.FolderId}, - {"database", HttpContext.DatabaseId}, - {"stream", HttpContext.StreamName}, - {"code", TStringBuilder() << (int)StatusToHttpCode(status)}, - {"name", "api.http.errors_per_second"}} - }); - ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, - {{"method", Method}, - {"cloud", HttpContext.CloudId}, - {"folder", HttpContext.FolderId}, - {"database", HttpContext.DatabaseId}, + {{"database", HttpContext.DatabaseName}, + {"method", Method}, + {"cloud_id", HttpContext.CloudId}, + {"folder_id", HttpContext.FolderId}, + {"database_id", HttpContext.DatabaseId}, {"topic", HttpContext.StreamName}, {"code", TStringBuilder() << (int)StatusToHttpCode(status)}, {"name", "api.data_streams.errors_per_second"}} @@ -443,10 +432,6 @@ namespace NKikimr::NHttpProxy { } FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels(Method, HttpContext, "api.data_streams.requests_per_second") @@ -460,10 +445,6 @@ namespace NKikimr::NHttpProxy { void ReportLatencyCounters(const TActorContext& ctx) { TDuration dur = ctx.Now() - StartTime; - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, - BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, BuildLabels(Method, HttpContext, "api.data_streams.requests_duration_milliseconds") @@ -478,10 +459,6 @@ namespace NKikimr::NHttpProxy { *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); - /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), - new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix) - }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, BuildLabels(Method, HttpContext, "api.data_streams.success_per_second") diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9ae32f6cec..40a7b6f535 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -851,20 +851,20 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { void TPartition::SetupStreamCounters(const TActorContext& ctx) { const auto topicName = TopicConverter->GetModernName(); auto counters = AppData(ctx)->Counters; - auto labels = NPersQueue::GetLabelsForStream(TopicConverter, CloudId, DbId, FolderId); + auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, FolderId); WriteBufferIsFullCounter.SetCounter( - NPersQueue::GetCountersForStream(counters), - {{"cloud", CloudId}, - {"folder", FolderId}, - {"database", DbId}, + NPersQueue::GetCountersForDataStream(counters), + {{"cloud_id", CloudId}, + {"folder_id", FolderId}, + {"database_id", DbId}, {"topic", TopicConverter->GetFederationPath()}, {"host", DCId}, {"partition", ToString<ui32>(Partition)}}, {"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true}); InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForStream(counters), labels, + NPersQueue::GetCountersForDataStream(counters), labels, {{"name", "topic.write_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {100, "100"}, {200, "200"}, {500, "500"}, @@ -873,7 +873,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {180'000,"180000"}, {9'999'999, "999999"}}, true)); MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForStream(counters), labels, + NPersQueue::GetCountersForDataStream(counters), labels, {{"name", "topic.written_message_size_bytes"}}, "bin", TVector<std::pair<ui64, TString>>{ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, @@ -883,18 +883,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); BytesWritten = NKikimr::NPQ::TMultiCounter( - - NPersQueue::GetCountersForStream(counters), labels, {}, + NPersQueue::GetCountersForDataStream(counters), labels, {}, {"api.topic_service.stream_write.bytes_per_second", "topic.written_bytes_per_second"} , true, "name"); MsgsWritten = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForStream(counters), labels, {}, + NPersQueue::GetCountersForDataStream(counters), labels, {}, {"api.topic_service.stream_write.messages_per_second", "topic.written_messages_per_second"}, true, "name"); BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForStream(counters), labels, {}, + NPersQueue::GetCountersForDataStream(counters), labels, {}, {"topic.written_uncompressed_bytes_per_second"}, true, "name"); TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; @@ -908,7 +907,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForStream(counters), labels, + NPersQueue::GetCountersForDataStream(counters), labels, {{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, @@ -919,7 +918,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForStream(counters), labels, + NPersQueue::GetCountersForDataStream(counters), labels, {{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index c9993792bd..d9559779c9 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -322,9 +322,9 @@ struct TUserInfo { const TActorContext& ctx, const TString& dcId, const TString& partition, const TString& cloudId, const TString& dbId, const TString& folderId ) { - auto subgroup = NPersQueue::GetCountersForStream(AppData(ctx)->Counters); + auto subgroup = NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters); auto aggregates = - NPersQueue::GetLabelsForStream(TopicConverter, cloudId, dbId, folderId); + NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, folderId); BytesRead = TMultiCounter(subgroup, aggregates, {{"consumer", User}}, @@ -336,13 +336,13 @@ struct TUserInfo { "topic.read_messages_per_second"}, true, "name"); Counter.SetCounter(subgroup, - {{"cloud", cloudId}, {"folder", folderId}, {"database", dbId}, + {{"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, {"topic", TopicConverter->GetFederationPath()}, {"consumer", User}, {"host", dcId}, {"partition", partition}}, {"name", "topic.awaiting_consume_milliseconds", true}); ReadTimeLag.reset(new TPercentileCounter( - NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates, + NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters), aggregates, {{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index b52e418d24..7c39ad45a5 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -1,93 +1,87 @@ <pre> -cloud=somecloud: +topic=topic: + name=api.topic_service.stream_write.bytes_per_second: 2700 + name=api.topic_service.stream_write.messages_per_second: 150 + name=topic.written_bytes_per_second: 2700 + name=topic.written_messages_per_second: 150 + name=topic.written_uncompressed_bytes_per_second: 1350 - folder=somefolder: + consumer=user: + name=api.topic_service.stream_read.bytes_per_second: 0 + name=api.topic_service.stream_read.messages_per_second: 0 + name=topic.read_bytes_per_second: 0 + name=topic.read_messages_per_second: 0 - database=PQ: + host=1: - topic=topic: - name=api.topic_service.stream_write.bytes_per_second: 2700 - name=api.topic_service.stream_write.messages_per_second: 150 - name=topic.written_bytes_per_second: 2700 - name=topic.written_messages_per_second: 150 - name=topic.written_uncompressed_bytes_per_second: 1350 + partition=0: + name=topic.awaiting_consume_milliseconds: 0 - consumer=user: - name=api.topic_service.stream_read.bytes_per_second: 0 - name=api.topic_service.stream_read.messages_per_second: 0 - name=topic.read_bytes_per_second: 0 - name=topic.read_messages_per_second: 0 + partition=1: + name=topic.awaiting_consume_milliseconds: 0 - host=1: + name=topic.read_lag_milliseconds: + bin=100: 0 + bin=1000: 0 + bin=10000: 0 + bin=180000: 0 + bin=200: 0 + bin=2000: 0 + bin=30000: 0 + bin=500: 0 + bin=5000: 0 + bin=60000: 0 + bin=999999: 0 - partition=0: - name=topic.awaiting_consume_milliseconds: 0 + host=1: - partition=1: - name=topic.awaiting_consume_milliseconds: 0 + partition=0: + name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 - name=topic.read_lag_milliseconds: - bin=100: 0 - bin=1000: 0 - bin=10000: 0 - bin=180000: 0 - bin=200: 0 - bin=2000: 0 - bin=30000: 0 - bin=500: 0 - bin=5000: 0 - bin=60000: 0 - bin=999999: 0 + partition=1: + name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 - host=1: + name=api.topic_service.stream_write.partition_throttled_milliseconds: + bin=0: 150 + bin=1: 0 + bin=10: 0 + bin=100: 0 + bin=1000: 0 + bin=10000: 0 + bin=20: 0 + bin=2500: 0 + bin=5: 0 + bin=50: 0 + bin=500: 0 + bin=5000: 0 + bin=999999: 0 - partition=0: - name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 + name=topic.write_lag_milliseconds: + bin=100: 150 + bin=1000: 0 + bin=10000: 0 + bin=180000: 0 + bin=200: 0 + bin=2000: 0 + bin=30000: 0 + bin=500: 0 + bin=5000: 0 + bin=60000: 0 + bin=999999: 0 - partition=1: - name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 - - name=api.topic_service.stream_write.partition_throttled_milliseconds: - bin=0: 150 - bin=1: 0 - bin=10: 0 - bin=100: 0 - bin=1000: 0 - bin=10000: 0 - bin=20: 0 - bin=2500: 0 - bin=5: 0 - bin=50: 0 - bin=500: 0 - bin=5000: 0 - bin=999999: 0 - - name=topic.write_lag_milliseconds: - bin=100: 150 - bin=1000: 0 - bin=10000: 0 - bin=180000: 0 - bin=200: 0 - bin=2000: 0 - bin=30000: 0 - bin=500: 0 - bin=5000: 0 - bin=60000: 0 - bin=999999: 0 - - name=topic.written_message_size_bytes: - bin=1024: 150 - bin=10240: 0 - bin=102400: 0 - bin=1048576: 0 - bin=10485760: 0 - bin=20480: 0 - bin=204800: 0 - bin=2097152: 0 - bin=5120: 0 - bin=51200: 0 - bin=524288: 0 - bin=5242880: 0 - bin=67108864: 0 - bin=99999999: 0 + name=topic.written_message_size_bytes: + bin=1024: 150 + bin=10240: 0 + bin=102400: 0 + bin=1048576: 0 + bin=10485760: 0 + bin=20480: 0 + bin=204800: 0 + bin=2097152: 0 + bin=5120: 0 + bin=51200: 0 + bin=524288: 0 + bin=5242880: 0 + bin=67108864: 0 + bin=99999999: 0 </pre> diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp index 28600f0f73..2cddb6a157 100644 --- a/ydb/library/persqueue/tests/counters.cpp +++ b/ydb/library/persqueue/tests/counters.cpp @@ -96,9 +96,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, TStringBuilder queryBuilder; queryBuilder << "/counters/counters=" << counters << - "/cloud=" << cloudId << - "/folder=" << folderId << - "/database=" << databaseId << + "/cloud_id=" << cloudId << + "/folder_id=" << folderId << + "/database_id=" << databaseId << "/topic=" << JoinRange("%2F", pathItems.begin(), pathItems.end()); if (consumer) { diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp index 695d1c7a66..ded7dbf7a8 100644 --- a/ydb/library/persqueue/topic_parser/counters.cpp +++ b/ydb/library/persqueue/topic_parser/counters.cpp @@ -15,7 +15,7 @@ namespace NPersQueue { ->GetSubgroup("Topic", topic->GetShortClientsideName()); } -::NMonitoring::TDynamicCounterPtr GetCountersForStream(::NMonitoring::TDynamicCounterPtr counters) +::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters) { return counters->GetSubgroup("counters", "datastreams"); } @@ -37,12 +37,12 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic) return GetLabelsForCustomCluster(topic, topic->GetCluster()); } -TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId, +TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, const TString& folderId) { TVector<TPQLabelsInfo> res = { - {{{"cloud", cloudId}}, {cloudId}}, - {{{"folder", folderId}}, {folderId}}, - {{{"database", dbId}}, {dbId}}, + {{{"cloud_id", cloudId}}, {cloudId}}, + {{{"folder_id", folderId}}, {folderId}}, + {{{"database_id", dbId}}, {dbId}}, {{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}}; return res; } diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h index 32af523141..435fb61d88 100644 --- a/ydb/library/persqueue/topic_parser/counters.h +++ b/ydb/library/persqueue/topic_parser/counters.h @@ -9,11 +9,11 @@ namespace NPersQueue { TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic); //TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic); TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster); -TVector<NPersQueue::TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const TString& cloudId, - const TString& dbId, const TString& folderId); +TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, + const TString& dbId, const TString& folderId); ::NMonitoring::TDynamicCounterPtr GetCounters(::NMonitoring::TDynamicCounterPtr counters, const TString& subsystem, const TTopicConverterPtr& topic); -::NMonitoring::TDynamicCounterPtr GetCountersForStream(::NMonitoring::TDynamicCounterPtr counters); +::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters); } // namespace NPersQueue diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 3df6f98827..525678c3d7 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -812,8 +812,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu const TString& cloudId, const TString& dbId, const TString& folderId) { auto& topicCounters = TopicCounters[topic->GetInternalName()]; - auto subGroup = NPersQueue::GetCountersForStream(Counters); - auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId); + auto subGroup = NPersQueue::GetCountersForDataStream(Counters); + auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, folderId); const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name"); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 1283a3c2f5..4387a001f8 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -521,8 +521,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou } //now topic is checked, can create group for real topic, not garbage - auto subGroup = NPersQueue::GetCountersForStream(Counters); - auto aggr = NPersQueue::GetLabelsForStream(FullConverter, cloudId, dbId, folderId); + auto subGroup = NPersQueue::GetCountersForDataStream(Counters); + auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, folderId); SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name"); SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name"); |