diff options
author | mokhotskii <[email protected]> | 2022-12-04 21:25:13 +0300 |
---|---|---|
committer | mokhotskii <[email protected]> | 2022-12-04 21:25:13 +0300 |
commit | 1ecfe41ec264d63b4ddc3605ae7ed150c2996e91 (patch) | |
tree | e8b23c6bf1a3f9ad162ff3d5b807b90b423c9b63 | |
parent | 3680ca61e6e893b6ddbba096616c807efd3d7c2b (diff) |
Update metrics names
Update metrics names:
* replace stream. prefix with topic.
* introduce api.topic_service. prefix
* introduce api.data_streams. prefix
* drop some metrics
-rw-r--r-- | ydb/core/http_proxy/custom_metrics.h | 113 | ||||
-rw-r--r-- | ydb/core/http_proxy/events.h | 4 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 32 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 27 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 16 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_datastreams.html | 69 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.cpp | 12 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.h | 4 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/counters.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 15 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 35 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 85 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 2 |
15 files changed, 256 insertions, 166 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h index 0b7bc5e3ee3..df27e091ccf 100644 --- a/ydb/core/http_proxy/custom_metrics.h +++ b/ydb/core/http_proxy/custom_metrics.h @@ -3,7 +3,6 @@ #include "events.h" #include "http_req.h" - namespace NKikimr::NHttpProxy { using namespace Ydb::DataStreams::V1; @@ -17,23 +16,31 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte Y_UNUSED(result, httpContext, ctx); } -TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) { +TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name, bool setTopicPrefix = true) { + const TString topicPrefix = setTopicPrefix ? "topic" : "stream"; if (method.empty()) { return {{"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId}, - {"database", httpContext.DatabaseId}, {"stream", httpContext.StreamName}, + {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName}, {"name", name}}; } return {{"method", method}, {"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId}, - {"database", httpContext.DatabaseId}, {"stream", httpContext.StreamName}, + {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName}, {"name", name}}; } +static const bool setStreamPrefix{false}; + template <> void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) { + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{request.records_size(), true, true, + BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix) + }); + ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{request.records_size(), true, true, - BuildLabels("", httpContext, "stream.incoming_records_per_second") + BuildLabels("", httpContext, "topic.written_messages_per_second") }); i64 bytes = 0; @@ -41,38 +48,62 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, bytes += rec.data().size() + rec.partition_key().size() + rec.explicit_hash_key().size(); } + /* deprecated metric */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.incoming_bytes_per_second") + BuildLabels("", httpContext, "topic.written_bytes_per_second") }); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.put_records.bytes_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.put_records.bytes_per_second") + BuildLabels("", httpContext, "api.data_streams.put_records.bytes_per_second") }); } template <> void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) { + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.incoming_records_per_second") + BuildLabels("", httpContext, "topic.written_messages_per_second") + }); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels("", httpContext, "stream.put_record.records_per_second", setStreamPrefix) }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_record.records_per_second") + BuildLabels("", httpContext, "api.data_streams.put_record.messages_per_second") }); i64 bytes = request.data().size() + request.partition_key().size() + request.explicit_hash_key().size(); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.incoming_bytes_per_second") + BuildLabels("", httpContext, "topic.written_bytes_per_second") }); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.put_record.bytes_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.put_record.bytes_per_second") + BuildLabels("", httpContext, "api.data_streams.put_record.bytes_per_second") }); } @@ -80,9 +111,13 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c template <> void FillOutputCustomMetrics<PutRecordResult>(const PutRecordResult& result, const THttpRequestContext& httpContext, const TActorContext& ctx) { Y_UNUSED(result); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels("", httpContext, "stream.put_record.success_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_record.success_per_second") + BuildLabels("", httpContext, "api.data_streams.put_record.success_per_second") }); } @@ -92,24 +127,40 @@ void FillOutputCustomMetrics<PutRecordsResult>(const PutRecordsResult& result, c i64 failed = result.failed_record_count(); i64 success = result.records_size() - failed; if (success > 0) { + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels("", httpContext, "stream.put_records.success_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.put_records.success_per_second") + BuildLabels("", httpContext, "api.data_streams.put_records.success_per_second") + }); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{success, true, true, + BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second", setStreamPrefix) }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{success, true, true, - BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second") + BuildLabels("", httpContext, "api.data_streams.put_records.successfull_messages_per_second") }); } + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{result.records_size(), true, true, + BuildLabels("", httpContext, "stream.put_records.total_records_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{result.records_size(), true, true, - BuildLabels("", httpContext, "stream.put_records.total_records_per_second") + BuildLabels("", httpContext, "api.data_streams.put_records.total_messages_per_second") }); if (failed > 0) { + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{failed, true, true, + BuildLabels("", httpContext, "streams.put_records.failed_records_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{failed, true, true, - BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") + BuildLabels("", httpContext, "api.data_streams.put_records.failed_messages_per_second") }); } } @@ -127,25 +178,45 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c ; }); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels("", httpContext, "stream.get_records.success_per_second", setStreamPrefix)} + ); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{records_n, true, true, + BuildLabels("", httpContext, "stream.get_records.records_per_second", setStreamPrefix)} + ); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.get_records.bytes_per_second", setStreamPrefix)} + ); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{records_n, true, true, + BuildLabels("", httpContext, "stream.outgoing_records_per_second", setStreamPrefix)} + ); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{bytes, true, true, + BuildLabels("", httpContext, "stream.outgoing_bytes_per_second", setStreamPrefix)} + ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels("", httpContext, "stream.get_records.success_per_second")} + BuildLabels("", httpContext, "api.data_streams.get_records.success_per_second")} ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{records_n, true, true, - BuildLabels("", httpContext, "stream.get_records.records_per_second")} + BuildLabels("", httpContext, "api.data_streams.get_records.messages_per_second")} ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.get_records.bytes_per_second")} + BuildLabels("", httpContext, "api.data_streams.get_records.bytes_per_second")} ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{records_n, true, true, - BuildLabels("", httpContext, "stream.outgoing_records_per_second")} + BuildLabels("", httpContext, "topic.read_messages_per_second")} ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{bytes, true, true, - BuildLabels("", httpContext, "stream.outgoing_bytes_per_second")} + BuildLabels("", httpContext, "topic.read_bytes_per_second")} ); } diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h index cd32e92c34b..c859597ebe2 100644 --- a/ydb/core/http_proxy/events.h +++ b/ydb/core/http_proxy/events.h @@ -130,7 +130,6 @@ namespace NKikimr::NHttpProxy { TEvClientReady() {} }; - struct TEvError : public TEventLocal<TEvError, EvError> { NYdb::EStatus Status; TString Response; @@ -140,9 +139,6 @@ namespace NKikimr::NHttpProxy { , Response(response) {} }; - - - }; diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index dbb2926d81e..ab908646594 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -392,7 +392,7 @@ namespace NKikimr::NHttpProxy { } void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) { - ctx.Send(MakeMetricsServiceID(), + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, {{"method", Method}, @@ -404,6 +404,18 @@ namespace NKikimr::NHttpProxy { {"name", "api.http.errors_per_second"}} }); + ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{ + 1, true, true, + {{"method", Method}, + {"cloud", HttpContext.CloudId}, + {"folder", HttpContext.FolderId}, + {"database", HttpContext.DatabaseId}, + {"topic", HttpContext.StreamName}, + {"code", TStringBuilder() << (int)StatusToHttpCode(status)}, + {"name", "api.data_streams.errors_per_second"}} + }); + HttpContext.ResponseData.Status = status; HttpContext.ResponseData.ErrorText = errorText; HttpContext.DoReply(ctx); @@ -431,9 +443,13 @@ namespace NKikimr::NHttpProxy { } FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.requests_per_second") + BuildLabels(Method, HttpContext, "api.data_streams.requests_per_second") }); CreateClient(ctx); return; @@ -444,9 +460,13 @@ namespace NKikimr::NHttpProxy { void ReportLatencyCounters(const TActorContext& ctx) { TDuration dur = ctx.Now() - StartTime; + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, + BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1, - BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds") + BuildLabels(Method, HttpContext, "api.data_streams.requests_duration_milliseconds") }); } @@ -458,9 +478,13 @@ namespace NKikimr::NHttpProxy { *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), + new TEvServerlessProxy::TEvCounter{1, true, true, + BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix) + }); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{1, true, true, - BuildLabels(Method, HttpContext, "api.http.success_per_second") + BuildLabels(Method, HttpContext, "api.data_streams.success_per_second") }); HttpContext.DoReply(ctx); diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 3625bbf130b..1f654e259b8 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -858,14 +858,14 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {{"cloud", CloudId}, {"folder", FolderId}, {"database", DbId}, - {"stream", TopicConverter->GetFederationPath()}, + {"topic", TopicConverter->GetFederationPath()}, {"host", DCId}, - {"shard", ToString<ui32>(Partition)}}, - {"name", "stream.internal_write.buffer_brimmed_duration_ms", true}); + {"partition", ToString<ui32>(Partition)}}, + {"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true}); InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForStream(counters), labels, - {{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin", + {{"name", "topic.write_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, @@ -874,7 +874,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForStream(counters), labels, - {{"name", "stream.internal_write.record_size_bytes"}}, "bin", + {{"name", "topic.written_message_size_bytes"}}, "bin", TVector<std::pair<ui64, TString>>{ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, {20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"}, @@ -885,20 +885,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { BytesWritten = NKikimr::NPQ::TMultiCounter( NPersQueue::GetCountersForStream(counters), labels, {}, - {"stream.internal_write.bytes_per_second", - "stream.incoming_bytes_per_second"} , true, "name"); + {"api.topic_service.stream_write.bytes_per_second", + "topic.written_bytes_per_second"} , true, "name"); MsgsWritten = NKikimr::NPQ::TMultiCounter( NPersQueue::GetCountersForStream(counters), labels, {}, - {"stream.internal_write.records_per_second", - "stream.incoming_records_per_second"}, true, "name"); + {"api.topic_service.stream_write.messages_per_second", + "topic.written_messages_per_second"}, true, "name"); BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( NPersQueue::GetCountersForStream(counters), labels, {}, - {"stream.internal_write.uncompressed_bytes_per_second"}, true, "name"); - BytesWrittenComp = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForStream(counters), labels, {}, - {"stream.internal_write.compacted_bytes_per_second"}, true, "name"); + {"topic.written_uncompressed_bytes_per_second"}, true, "name"); TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); @@ -912,7 +909,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForStream(counters), labels, - {{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin", + {{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, @@ -923,7 +920,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForStream(counters), labels, - {{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin", + {{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, {20, "20"}, {50, "50"}, {100, "100"}, {500, "500"}, diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index dfc2dcde0e2..168942c61d4 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -328,22 +328,22 @@ struct TUserInfo { BytesRead = TMultiCounter(subgroup, aggregates, {{"consumer", User}}, - {"stream.internal_read.bytes_per_second", - "stream.outgoing_bytes_per_second"}, true, "name"); + {"api.topic_service.stream_read.bytes_per_second", + "topic.read_bytes_per_second"}, true, "name"); MsgsRead = TMultiCounter(subgroup, aggregates, {{"consumer", User}}, - {"stream.internal_read.records_per_second", - "stream.outgoing_records_per_second"}, true, "name"); + {"api.topic_service.stream_read.messages_per_second", + "topic.read_messages_per_second"}, true, "name"); Counter.SetCounter(subgroup, {{"cloud", cloudId}, {"folder", folderId}, {"database", dbId}, - {"stream", TopicConverter->GetFederationPath()}, - {"consumer", User}, {"host", dcId}, {"shard", partition}}, - {"name", "stream.await_operating_milliseconds", true}); + {"topic", TopicConverter->GetFederationPath()}, + {"consumer", User}, {"host", dcId}, {"partition", partition}}, + {"name", "topic.awaiting_consume_milliseconds", true}); ReadTimeLag.reset(new TPercentileCounter( NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates, - {{"consumer", User}, {"name", "stream.internal_read.time_lags_milliseconds"}}, "bin", + {{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, {5000, "5000"}, {10'000, "10000"}, diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index d57e5570337..f3d91e1d67e 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -130,7 +130,7 @@ Y_UNIT_TEST(PartitionFirstClass) { TStringStream countersStr; dbGroup->OutputHtml(countersStr); const TString referenceCounters = NResource::Find(TStringBuf("counters_datastreams.html")); - UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters); + UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters); } } diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index dfe910e132d..b52e418d24e 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -5,29 +5,28 @@ cloud=somecloud: database=PQ: - stream=topic: - name=stream.incoming_bytes_per_second: 2700 - name=stream.incoming_records_per_second: 150 - name=stream.internal_write.bytes_per_second: 2700 - name=stream.internal_write.compacted_bytes_per_second: 3720 - name=stream.internal_write.records_per_second: 150 - name=stream.internal_write.uncompressed_bytes_per_second: 1350 + topic=topic: + name=api.topic_service.stream_write.bytes_per_second: 2700 + name=api.topic_service.stream_write.messages_per_second: 150 + name=topic.written_bytes_per_second: 2700 + name=topic.written_messages_per_second: 150 + name=topic.written_uncompressed_bytes_per_second: 1350 consumer=user: - name=stream.internal_read.bytes_per_second: 0 - name=stream.internal_read.records_per_second: 0 - name=stream.outgoing_bytes_per_second: 0 - name=stream.outgoing_records_per_second: 0 + name=api.topic_service.stream_read.bytes_per_second: 0 + name=api.topic_service.stream_read.messages_per_second: 0 + name=topic.read_bytes_per_second: 0 + name=topic.read_messages_per_second: 0 host=1: - shard=0: - name=stream.await_operating_milliseconds: 0 + partition=0: + name=topic.awaiting_consume_milliseconds: 0 - shard=1: - name=stream.await_operating_milliseconds: 0 + partition=1: + name=topic.awaiting_consume_milliseconds: 0 - name=stream.internal_read.time_lags_milliseconds: + name=topic.read_lag_milliseconds: bin=100: 0 bin=1000: 0 bin=10000: 0 @@ -42,13 +41,13 @@ cloud=somecloud: host=1: - shard=0: - name=stream.internal_write.buffer_brimmed_duration_ms: 0 + partition=0: + name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 - shard=1: - name=stream.internal_write.buffer_brimmed_duration_ms: 0 + partition=1: + name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 - name=stream.internal_write.partition_write_quota_wait_milliseconds: + name=api.topic_service.stream_write.partition_throttled_milliseconds: bin=0: 150 bin=1: 0 bin=10: 0 @@ -63,7 +62,20 @@ cloud=somecloud: bin=5000: 0 bin=999999: 0 - name=stream.internal_write.record_size_bytes: + name=topic.write_lag_milliseconds: + bin=100: 150 + bin=1000: 0 + bin=10000: 0 + bin=180000: 0 + bin=200: 0 + bin=2000: 0 + bin=30000: 0 + bin=500: 0 + bin=5000: 0 + bin=60000: 0 + bin=999999: 0 + + name=topic.written_message_size_bytes: bin=1024: 150 bin=10240: 0 bin=102400: 0 @@ -78,17 +90,4 @@ cloud=somecloud: bin=5242880: 0 bin=67108864: 0 bin=99999999: 0 - - name=stream.internal_write.time_lags_milliseconds: - bin=100: 150 - bin=1000: 0 - bin=10000: 0 - bin=180000: 0 - bin=200: 0 - bin=2000: 0 - bin=30000: 0 - bin=500: 0 - bin=5000: 0 - bin=60000: 0 - bin=999999: 0 </pre> diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp index 2f3af930800..28600f0f73b 100644 --- a/ydb/library/persqueue/tests/counters.cpp +++ b/ydb/library/persqueue/tests/counters.cpp @@ -88,18 +88,18 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, const TString& cloudId, const TString& databaseId, - const TString& folderId, const TString& streamName, + const TString& folderId, const TString& topicName, const TString& consumer, const TString& host, - const TString& shard) { + const TString& partition) { bool mayFail = false; - TVector<TString> pathItems = SplitString(streamName, "/"); + TVector<TString> pathItems = SplitString(topicName, "/"); TStringBuilder queryBuilder; queryBuilder << "/counters/counters=" << counters << "/cloud=" << cloudId << "/folder=" << folderId << "/database=" << databaseId << - "/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end()); + "/topic=" << JoinRange("%2F", pathItems.begin(), pathItems.end()); if (consumer) { queryBuilder << @@ -111,9 +111,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, "/host=" << host; } - if (shard) { + if (partition) { queryBuilder << - "/shard=" << shard; + "/partition=" << partition; mayFail = true; } diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h index 9b6958623e0..588215b7ae0 100644 --- a/ydb/library/persqueue/tests/counters.h +++ b/ydb/library/persqueue/tests/counters.h @@ -17,8 +17,8 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, const TString& cloudId, const TString& databaseId, - const TString& folderId, const TString& streamName, + const TString& folderId, const TString& topicName, const TString& consumer, const TString& host, - const TString& shard); + const TString& partition); } // NKikimr::NPersQueueTests diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp index baf401d165b..695d1c7a669 100644 --- a/ydb/library/persqueue/topic_parser/counters.cpp +++ b/ydb/library/persqueue/topic_parser/counters.cpp @@ -43,7 +43,7 @@ TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const {{{"cloud", cloudId}}, {cloudId}}, {{{"folder", folderId}}, {folderId}}, {{{"database", dbId}}, {dbId}}, - {{{"stream", topic->GetClientsideName()}}, {topic->GetClientsideName()}}}; + {{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}}; return res; } diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index 7267c418397..d1b18bf5a2b 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -867,7 +867,9 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con if (!WaitForData) return; - Counters.WaitsForData.Inc(); + if (Counters.WaitsForData) { + Counters.WaitsForData.Inc(); + } Y_VERIFY(record.HasEndOffset()); Y_VERIFY(EndOffset <= record.GetEndOffset()); //end offset could not be changed if no data arrived, but signal will be sended anyway after timeout diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 85a879c661f..3df6f988275 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -816,14 +816,13 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId); const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; - topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name"); - topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name"); - topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name"); - topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name"); - topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name"); - topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name"); - topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name"); - topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name"); + topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name"); + topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_released_per_second"}, true, "name"); + topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_to_be_released_count"}, false, "name"); + topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_to_be_locked_count"}, false, "name"); + topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_count"}, false, "name"); + topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_errors_per_second"}, true, "name"); + topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.commits_per_second"}, true, "name"); topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index c5857901a45..1283a3c2f5c 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -286,8 +286,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { if (SessionsActive) { SessionsActive.Dec(); - BytesInflight.Dec(BytesInflight_); - BytesInflightTotal.Dec(BytesInflightTotal_); + if (BytesInflight && BytesInflightTotal) { + BytesInflight.Dec(BytesInflight_); + BytesInflightTotal.Dec(BytesInflightTotal_); + } } LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " is DEAD"); @@ -522,11 +524,9 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou auto subGroup = NPersQueue::GetCountersForStream(Counters); auto aggr = NPersQueue::GetLabelsForStream(FullConverter, cloudId, dbId, folderId); - BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false, "name"); - BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false, "name"); - SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true, "name"); - SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name"); - Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name"); + SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name"); + SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name"); + Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.errors_per_second"}, true, "name"); SessionsCreated.Inc(); @@ -1022,8 +1022,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T AcceptedRequests.emplace_back(std::move(writeRequest)); BytesInflight_ -= diff; - BytesInflight.Dec(diff); - + if (BytesInflight) { + BytesInflight.Dec(diff); + } if (!NextRequestInited && BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended NextRequestInited = true; if (!Request->GetStreamCtx()->Read()) { @@ -1177,7 +1178,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T ui64 diff = writeRequest->ByteSize; BytesInflightTotal_ -= diff; - BytesInflightTotal.Dec(diff); + if (BytesInflightTotal) { + BytesInflightTotal.Dec(diff); + } CheckFinish(ctx); } @@ -1276,8 +1279,10 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteReques diff += request->PartitionWriteRequest->Record.ByteSize(); BytesInflight_ += diff; BytesInflightTotal_ += diff; - BytesInflight.Inc(diff); - BytesInflightTotal.Inc(diff); + if (BytesInflight && BytesInflightTotal) { + BytesInflight.Inc(diff); + BytesInflightTotal.Inc(diff); + } ctx.Send(Writer, std::move(request->PartitionWriteRequest)); SentRequests.push_back(std::move(request)); @@ -1466,8 +1471,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e ui64 diff = ev->Get()->Request.ByteSize(); BytesInflight_ += diff; BytesInflightTotal_ += diff; - BytesInflight.Inc(diff); - BytesInflightTotal.Inc(diff); + if (BytesInflight && BytesInflightTotal) { + BytesInflight.Inc(diff); + BytesInflightTotal.Inc(diff); + } if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended Y_VERIFY(NextRequestInited); diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index fe04c78c367..dbcf9257ddb 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -387,81 +387,76 @@ namespace NKikimr::NPersQueueTests { checkCounters(monPort, { - "stream.internal_read.commits_per_second", - "stream.internal_read.partitions_errors_per_second", - "stream.internal_read.partitions_locked", - "stream.internal_read.partitions_locked_per_second", - "stream.internal_read.partitions_released_per_second", - "stream.internal_read.partitions_to_be_locked", - "stream.internal_read.partitions_to_be_released", - "stream.internal_read.waits_for_data", - "stream.internal_write.bytes_proceeding", - "stream.internal_write.bytes_proceeding_total", - "stream.internal_write.errors_per_second", - "stream.internal_write.sessions_active", - "stream.internal_write.sessions_created_per_second", + "api.topic_service.stream_read.commits_per_second", + "api.topic_service.stream_read.partitions_errors_per_second", + "api.topic_service.stream_read.partitions_locked_count", + "api.topic_service.stream_read.partitions_locked_per_second", + "api.topic_service.stream_read.partitions_released_per_second", + "api.topic_service.stream_read.partitions_to_be_locked_count", + "api.topic_service.stream_read.partitions_to_be_released_count", + "api.topic_service.stream_write.errors_per_second", + "api.topic_service.stream_write.sessions_active_count", + "api.topic_service.stream_write.sessions_created_per_second", }, topicName, "", "", "" ); checkCounters(monPort, { - "stream.internal_read.commits_per_second", - "stream.internal_read.partitions_errors_per_second", - "stream.internal_read.partitions_locked", - "stream.internal_read.partitions_locked_per_second", - "stream.internal_read.partitions_released_per_second", - "stream.internal_read.partitions_to_be_locked", - "stream.internal_read.partitions_to_be_released", - "stream.internal_read.waits_for_data", + "api.topic_service.stream_read.commits_per_second", + "api.topic_service.stream_read.partitions_errors_per_second", + "api.topic_service.stream_read.partitions_locked_count", + "api.topic_service.stream_read.partitions_locked_per_second", + "api.topic_service.stream_read.partitions_released_per_second", + "api.topic_service.stream_read.partitions_to_be_locked_count", + "api.topic_service.stream_read.partitions_to_be_released_count", }, topicName, consumerName, "", "" ); checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), { - "stream.internal_read.time_lags_milliseconds", - "stream.incoming_bytes_per_second", - "stream.incoming_records_per_second", - "stream.internal_write.bytes_per_second", - "stream.internal_write.compacted_bytes_per_second", - "stream.internal_write.partition_write_quota_wait_milliseconds", - "stream.internal_write.record_size_bytes", - "stream.internal_write.records_per_second", - "stream.internal_write.time_lags_milliseconds", - "stream.internal_write.uncompressed_bytes_per_second", - "stream.await_operating_milliseconds", - "stream.internal_write.buffer_brimmed_duration_ms", - "stream.internal_read.bytes_per_second", - "stream.internal_read.records_per_second", - "stream.outgoing_bytes_per_second", - "stream.outgoing_records_per_second", + "topic.read_lag_milliseconds", + "topic.written_bytes_per_second", + "topic.written_messages_per_second", + "api.topic_service.stream_write.bytes_per_second", + "api.topic_service.stream_write.partition_throttled_milliseconds", + "topic.written_message_size_bytes", + "api.topic_service.stream_write.messages_per_second", + "topic.write_lag_milliseconds", + "topic.written_uncompressed_bytes_per_second", + "topic.awaiting_consume_milliseconds", + "api.topic_service.stream_write.buffer_brimmed_milliseconds", + "api.topic_service.stream_read.bytes_per_second", + "api.topic_service.stream_read.messages_per_second", + "topic.read_bytes_per_second", + "topic.read_messages_per_second", }, topicName, "", "", "" ); checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), { - "stream.internal_read.time_lags_milliseconds", - "stream.await_operating_milliseconds", - "stream.internal_read.bytes_per_second", - "stream.internal_read.records_per_second", - "stream.outgoing_bytes_per_second", - "stream.outgoing_records_per_second", + "topic.read_lag_milliseconds", + "topic.awaiting_consume_milliseconds", + "api.topic_service.stream_read.bytes_per_second", + "api.topic_service.stream_read.messages_per_second", + "topic.read_bytes_per_second", + "topic.read_messages_per_second", }, topicName, consumerName, "", "" ); checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), { - "stream.await_operating_milliseconds" + "topic.awaiting_consume_milliseconds" }, topicName, consumerName, "1", "" ); checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), { - "stream.internal_write.buffer_brimmed_duration_ms" + "api.topic_service.stream_write.buffer_brimmed_milliseconds" }, topicName, "", "1", "" ); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index c4d2620d22a..f01e29cc704 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -2780,7 +2780,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { "PartitionsReleased", "PartitionsToBeLocked", "PartitionsToBeReleased", - "WaitsForData" + "WaitsForData", }, "", "cluster", "", "" ); |