summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <[email protected]>2022-12-04 21:25:13 +0300
committermokhotskii <[email protected]>2022-12-04 21:25:13 +0300
commit1ecfe41ec264d63b4ddc3605ae7ed150c2996e91 (patch)
treee8b23c6bf1a3f9ad162ff3d5b807b90b423c9b63
parent3680ca61e6e893b6ddbba096616c807efd3d7c2b (diff)
Update metrics names
Update metrics names: * replace stream. prefix with topic. * introduce api.topic_service. prefix * introduce api.data_streams. prefix * drop some metrics
-rw-r--r--ydb/core/http_proxy/custom_metrics.h113
-rw-r--r--ydb/core/http_proxy/events.h4
-rw-r--r--ydb/core/http_proxy/http_req.cpp32
-rw-r--r--ydb/core/persqueue/partition.cpp27
-rw-r--r--ydb/core/persqueue/user_info.h16
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/resources/counters_datastreams.html69
-rw-r--r--ydb/library/persqueue/tests/counters.cpp12
-rw-r--r--ydb/library/persqueue/tests/counters.h4
-rw-r--r--ydb/library/persqueue/topic_parser/counters.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp4
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp15
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp35
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp85
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp2
15 files changed, 256 insertions, 166 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h
index 0b7bc5e3ee3..df27e091ccf 100644
--- a/ydb/core/http_proxy/custom_metrics.h
+++ b/ydb/core/http_proxy/custom_metrics.h
@@ -3,7 +3,6 @@
#include "events.h"
#include "http_req.h"
-
namespace NKikimr::NHttpProxy {
using namespace Ydb::DataStreams::V1;
@@ -17,23 +16,31 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte
Y_UNUSED(result, httpContext, ctx);
}
-TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) {
+TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name, bool setTopicPrefix = true) {
+ const TString topicPrefix = setTopicPrefix ? "topic" : "stream";
if (method.empty()) {
return {{"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId},
- {"database", httpContext.DatabaseId}, {"stream", httpContext.StreamName},
+ {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName},
{"name", name}};
}
return {{"method", method}, {"cloud", httpContext.CloudId}, {"folder", httpContext.FolderId},
- {"database", httpContext.DatabaseId}, {"stream", httpContext.StreamName},
+ {"database", httpContext.DatabaseId}, {topicPrefix, httpContext.StreamName},
{"name", name}};
}
+static const bool setStreamPrefix{false};
+
template <>
void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) {
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{request.records_size(), true, true,
+ BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix)
+ });
+
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{request.records_size(), true, true,
- BuildLabels("", httpContext, "stream.incoming_records_per_second")
+ BuildLabels("", httpContext, "topic.written_messages_per_second")
});
i64 bytes = 0;
@@ -41,38 +48,62 @@ void FillInputCustomMetrics<PutRecordsRequest>(const PutRecordsRequest& request,
bytes += rec.data().size() + rec.partition_key().size() + rec.explicit_hash_key().size();
}
+ /* deprecated metric */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.incoming_bytes_per_second")
+ BuildLabels("", httpContext, "topic.written_bytes_per_second")
});
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.put_records.bytes_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.put_records.bytes_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_records.bytes_per_second")
});
}
template <>
void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) {
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels("", httpContext, "stream.incoming_records_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.incoming_records_per_second")
+ BuildLabels("", httpContext, "topic.written_messages_per_second")
+ });
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels("", httpContext, "stream.put_record.records_per_second", setStreamPrefix)
});
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_record.records_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_record.messages_per_second")
});
i64 bytes = request.data().size() + request.partition_key().size() + request.explicit_hash_key().size();
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.incoming_bytes_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.incoming_bytes_per_second")
+ BuildLabels("", httpContext, "topic.written_bytes_per_second")
});
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.put_record.bytes_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.put_record.bytes_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_record.bytes_per_second")
});
}
@@ -80,9 +111,13 @@ void FillInputCustomMetrics<PutRecordRequest>(const PutRecordRequest& request, c
template <>
void FillOutputCustomMetrics<PutRecordResult>(const PutRecordResult& result, const THttpRequestContext& httpContext, const TActorContext& ctx) {
Y_UNUSED(result);
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels("", httpContext, "stream.put_record.success_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_record.success_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_record.success_per_second")
});
}
@@ -92,24 +127,40 @@ void FillOutputCustomMetrics<PutRecordsResult>(const PutRecordsResult& result, c
i64 failed = result.failed_record_count();
i64 success = result.records_size() - failed;
if (success > 0) {
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels("", httpContext, "stream.put_records.success_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.put_records.success_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_records.success_per_second")
+ });
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{success, true, true,
+ BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second", setStreamPrefix)
});
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{success, true, true,
- BuildLabels("", httpContext, "stream.put_records.successfull_records_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_records.successfull_messages_per_second")
});
}
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{result.records_size(), true, true,
+ BuildLabels("", httpContext, "stream.put_records.total_records_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{result.records_size(), true, true,
- BuildLabels("", httpContext, "stream.put_records.total_records_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_records.total_messages_per_second")
});
if (failed > 0) {
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{failed, true, true,
+ BuildLabels("", httpContext, "streams.put_records.failed_records_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{failed, true, true,
- BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
+ BuildLabels("", httpContext, "api.data_streams.put_records.failed_messages_per_second")
});
}
}
@@ -127,25 +178,45 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c
;
});
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels("", httpContext, "stream.get_records.success_per_second", setStreamPrefix)}
+ );
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{records_n, true, true,
+ BuildLabels("", httpContext, "stream.get_records.records_per_second", setStreamPrefix)}
+ );
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.get_records.bytes_per_second", setStreamPrefix)}
+ );
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{records_n, true, true,
+ BuildLabels("", httpContext, "stream.outgoing_records_per_second", setStreamPrefix)}
+ );
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{bytes, true, true,
+ BuildLabels("", httpContext, "stream.outgoing_bytes_per_second", setStreamPrefix)}
+ );
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels("", httpContext, "stream.get_records.success_per_second")}
+ BuildLabels("", httpContext, "api.data_streams.get_records.success_per_second")}
);
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{records_n, true, true,
- BuildLabels("", httpContext, "stream.get_records.records_per_second")}
+ BuildLabels("", httpContext, "api.data_streams.get_records.messages_per_second")}
);
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.get_records.bytes_per_second")}
+ BuildLabels("", httpContext, "api.data_streams.get_records.bytes_per_second")}
);
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{records_n, true, true,
- BuildLabels("", httpContext, "stream.outgoing_records_per_second")}
+ BuildLabels("", httpContext, "topic.read_messages_per_second")}
);
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{bytes, true, true,
- BuildLabels("", httpContext, "stream.outgoing_bytes_per_second")}
+ BuildLabels("", httpContext, "topic.read_bytes_per_second")}
);
}
diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h
index cd32e92c34b..c859597ebe2 100644
--- a/ydb/core/http_proxy/events.h
+++ b/ydb/core/http_proxy/events.h
@@ -130,7 +130,6 @@ namespace NKikimr::NHttpProxy {
TEvClientReady() {}
};
-
struct TEvError : public TEventLocal<TEvError, EvError> {
NYdb::EStatus Status;
TString Response;
@@ -140,9 +139,6 @@ namespace NKikimr::NHttpProxy {
, Response(response)
{}
};
-
-
-
};
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index dbb2926d81e..ab908646594 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -392,7 +392,7 @@ namespace NKikimr::NHttpProxy {
}
void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText) {
- ctx.Send(MakeMetricsServiceID(),
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{
1, true, true,
{{"method", Method},
@@ -404,6 +404,18 @@ namespace NKikimr::NHttpProxy {
{"name", "api.http.errors_per_second"}}
});
+ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{
+ 1, true, true,
+ {{"method", Method},
+ {"cloud", HttpContext.CloudId},
+ {"folder", HttpContext.FolderId},
+ {"database", HttpContext.DatabaseId},
+ {"topic", HttpContext.StreamName},
+ {"code", TStringBuilder() << (int)StatusToHttpCode(status)},
+ {"name", "api.data_streams.errors_per_second"}}
+ });
+
HttpContext.ResponseData.Status = status;
HttpContext.ResponseData.ErrorText = errorText;
HttpContext.DoReply(ctx);
@@ -431,9 +443,13 @@ namespace NKikimr::NHttpProxy {
}
FillInputCustomMetrics<TProtoRequest>(Request, HttpContext, ctx);
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels(Method, HttpContext, "api.http.requests_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.requests_per_second")
+ BuildLabels(Method, HttpContext, "api.data_streams.requests_per_second")
});
CreateClient(ctx);
return;
@@ -444,9 +460,13 @@ namespace NKikimr::NHttpProxy {
void ReportLatencyCounters(const TActorContext& ctx) {
TDuration dur = ctx.Now() - StartTime;
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
+ BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvHistCounter{static_cast<i64>(dur.MilliSeconds()), 1,
- BuildLabels(Method, HttpContext, "api.http.requests_duration_milliseconds")
+ BuildLabels(Method, HttpContext, "api.data_streams.requests_duration_milliseconds")
});
}
@@ -458,9 +478,13 @@ namespace NKikimr::NHttpProxy {
*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
ReportLatencyCounters(ctx);
+ /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(),
+ new TEvServerlessProxy::TEvCounter{1, true, true,
+ BuildLabels(Method, HttpContext, "api.http.success_per_second", setStreamPrefix)
+ });
ctx.Send(MakeMetricsServiceID(),
new TEvServerlessProxy::TEvCounter{1, true, true,
- BuildLabels(Method, HttpContext, "api.http.success_per_second")
+ BuildLabels(Method, HttpContext, "api.data_streams.success_per_second")
});
HttpContext.DoReply(ctx);
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 3625bbf130b..1f654e259b8 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -858,14 +858,14 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{{"cloud", CloudId},
{"folder", FolderId},
{"database", DbId},
- {"stream", TopicConverter->GetFederationPath()},
+ {"topic", TopicConverter->GetFederationPath()},
{"host", DCId},
- {"shard", ToString<ui32>(Partition)}},
- {"name", "stream.internal_write.buffer_brimmed_duration_ms", true});
+ {"partition", ToString<ui32>(Partition)}},
+ {"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true});
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForStream(counters), labels,
- {{"name", "stream.internal_write.time_lags_milliseconds"}}, "bin",
+ {{"name", "topic.write_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"}, {5000, "5000"},
@@ -874,7 +874,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForStream(counters), labels,
- {{"name", "stream.internal_write.record_size_bytes"}}, "bin",
+ {{"name", "topic.written_message_size_bytes"}}, "bin",
TVector<std::pair<ui64, TString>>{
{1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
{20'480, "20480"}, {51'200, "51200"}, {102'400, "102400"},
@@ -885,20 +885,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
BytesWritten = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForStream(counters), labels, {},
- {"stream.internal_write.bytes_per_second",
- "stream.incoming_bytes_per_second"} , true, "name");
+ {"api.topic_service.stream_write.bytes_per_second",
+ "topic.written_bytes_per_second"} , true, "name");
MsgsWritten = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForStream(counters), labels, {},
- {"stream.internal_write.records_per_second",
- "stream.incoming_records_per_second"}, true, "name");
+ {"api.topic_service.stream_write.messages_per_second",
+ "topic.written_messages_per_second"}, true, "name");
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForStream(counters), labels, {},
- {"stream.internal_write.uncompressed_bytes_per_second"}, true, "name");
- BytesWrittenComp = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForStream(counters), labels, {},
- {"stream.internal_write.compacted_bytes_per_second"}, true, "name");
+ {"topic.written_uncompressed_bytes_per_second"}, true, "name");
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
@@ -912,7 +909,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForStream(counters), labels,
- {{"name", "stream.internal_write.topic_write_quota_wait_milliseconds"}}, "bin",
+ {{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
{20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
@@ -923,7 +920,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForStream(counters), labels,
- {{"name", "stream.internal_write.partition_write_quota_wait_milliseconds"}}, "bin",
+ {{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
{20, "20"}, {50, "50"}, {100, "100"}, {500, "500"},
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index dfc2dcde0e2..168942c61d4 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -328,22 +328,22 @@ struct TUserInfo {
BytesRead = TMultiCounter(subgroup,
aggregates, {{"consumer", User}},
- {"stream.internal_read.bytes_per_second",
- "stream.outgoing_bytes_per_second"}, true, "name");
+ {"api.topic_service.stream_read.bytes_per_second",
+ "topic.read_bytes_per_second"}, true, "name");
MsgsRead = TMultiCounter(subgroup,
aggregates, {{"consumer", User}},
- {"stream.internal_read.records_per_second",
- "stream.outgoing_records_per_second"}, true, "name");
+ {"api.topic_service.stream_read.messages_per_second",
+ "topic.read_messages_per_second"}, true, "name");
Counter.SetCounter(subgroup,
{{"cloud", cloudId}, {"folder", folderId}, {"database", dbId},
- {"stream", TopicConverter->GetFederationPath()},
- {"consumer", User}, {"host", dcId}, {"shard", partition}},
- {"name", "stream.await_operating_milliseconds", true});
+ {"topic", TopicConverter->GetFederationPath()},
+ {"consumer", User}, {"host", dcId}, {"partition", partition}},
+ {"name", "topic.awaiting_consume_milliseconds", true});
ReadTimeLag.reset(new TPercentileCounter(
NPersQueue::GetCountersForStream(AppData(ctx)->Counters), aggregates,
- {{"consumer", User}, {"name", "stream.internal_read.time_lags_milliseconds"}}, "bin",
+ {{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
{5000, "5000"}, {10'000, "10000"},
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index d57e5570337..f3d91e1d67e 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -130,7 +130,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
TStringStream countersStr;
dbGroup->OutputHtml(countersStr);
const TString referenceCounters = NResource::Find(TStringBuf("counters_datastreams.html"));
- UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
+ UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
}
}
diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html
index dfe910e132d..b52e418d24e 100644
--- a/ydb/core/persqueue/ut/resources/counters_datastreams.html
+++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html
@@ -5,29 +5,28 @@ cloud=somecloud:
database=PQ:
- stream=topic:
- name=stream.incoming_bytes_per_second: 2700
- name=stream.incoming_records_per_second: 150
- name=stream.internal_write.bytes_per_second: 2700
- name=stream.internal_write.compacted_bytes_per_second: 3720
- name=stream.internal_write.records_per_second: 150
- name=stream.internal_write.uncompressed_bytes_per_second: 1350
+ topic=topic:
+ name=api.topic_service.stream_write.bytes_per_second: 2700
+ name=api.topic_service.stream_write.messages_per_second: 150
+ name=topic.written_bytes_per_second: 2700
+ name=topic.written_messages_per_second: 150
+ name=topic.written_uncompressed_bytes_per_second: 1350
consumer=user:
- name=stream.internal_read.bytes_per_second: 0
- name=stream.internal_read.records_per_second: 0
- name=stream.outgoing_bytes_per_second: 0
- name=stream.outgoing_records_per_second: 0
+ name=api.topic_service.stream_read.bytes_per_second: 0
+ name=api.topic_service.stream_read.messages_per_second: 0
+ name=topic.read_bytes_per_second: 0
+ name=topic.read_messages_per_second: 0
host=1:
- shard=0:
- name=stream.await_operating_milliseconds: 0
+ partition=0:
+ name=topic.awaiting_consume_milliseconds: 0
- shard=1:
- name=stream.await_operating_milliseconds: 0
+ partition=1:
+ name=topic.awaiting_consume_milliseconds: 0
- name=stream.internal_read.time_lags_milliseconds:
+ name=topic.read_lag_milliseconds:
bin=100: 0
bin=1000: 0
bin=10000: 0
@@ -42,13 +41,13 @@ cloud=somecloud:
host=1:
- shard=0:
- name=stream.internal_write.buffer_brimmed_duration_ms: 0
+ partition=0:
+ name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
- shard=1:
- name=stream.internal_write.buffer_brimmed_duration_ms: 0
+ partition=1:
+ name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
- name=stream.internal_write.partition_write_quota_wait_milliseconds:
+ name=api.topic_service.stream_write.partition_throttled_milliseconds:
bin=0: 150
bin=1: 0
bin=10: 0
@@ -63,7 +62,20 @@ cloud=somecloud:
bin=5000: 0
bin=999999: 0
- name=stream.internal_write.record_size_bytes:
+ name=topic.write_lag_milliseconds:
+ bin=100: 150
+ bin=1000: 0
+ bin=10000: 0
+ bin=180000: 0
+ bin=200: 0
+ bin=2000: 0
+ bin=30000: 0
+ bin=500: 0
+ bin=5000: 0
+ bin=60000: 0
+ bin=999999: 0
+
+ name=topic.written_message_size_bytes:
bin=1024: 150
bin=10240: 0
bin=102400: 0
@@ -78,17 +90,4 @@ cloud=somecloud:
bin=5242880: 0
bin=67108864: 0
bin=99999999: 0
-
- name=stream.internal_write.time_lags_milliseconds:
- bin=100: 150
- bin=1000: 0
- bin=10000: 0
- bin=180000: 0
- bin=200: 0
- bin=2000: 0
- bin=30000: 0
- bin=500: 0
- bin=5000: 0
- bin=60000: 0
- bin=999999: 0
</pre>
diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp
index 2f3af930800..28600f0f73b 100644
--- a/ydb/library/persqueue/tests/counters.cpp
+++ b/ydb/library/persqueue/tests/counters.cpp
@@ -88,18 +88,18 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co
NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
const TString& cloudId, const TString& databaseId,
- const TString& folderId, const TString& streamName,
+ const TString& folderId, const TString& topicName,
const TString& consumer, const TString& host,
- const TString& shard) {
+ const TString& partition) {
bool mayFail = false;
- TVector<TString> pathItems = SplitString(streamName, "/");
+ TVector<TString> pathItems = SplitString(topicName, "/");
TStringBuilder queryBuilder;
queryBuilder <<
"/counters/counters=" << counters <<
"/cloud=" << cloudId <<
"/folder=" << folderId <<
"/database=" << databaseId <<
- "/stream=" << JoinRange("%2F", pathItems.begin(), pathItems.end());
+ "/topic=" << JoinRange("%2F", pathItems.begin(), pathItems.end());
if (consumer) {
queryBuilder <<
@@ -111,9 +111,9 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
"/host=" << host;
}
- if (shard) {
+ if (partition) {
queryBuilder <<
- "/shard=" << shard;
+ "/partition=" << partition;
mayFail = true;
}
diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h
index 9b6958623e0..588215b7ae0 100644
--- a/ydb/library/persqueue/tests/counters.h
+++ b/ydb/library/persqueue/tests/counters.h
@@ -17,8 +17,8 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co
NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
const TString& cloudId, const TString& databaseId,
- const TString& folderId, const TString& streamName,
+ const TString& folderId, const TString& topicName,
const TString& consumer, const TString& host,
- const TString& shard);
+ const TString& partition);
} // NKikimr::NPersQueueTests
diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp
index baf401d165b..695d1c7a669 100644
--- a/ydb/library/persqueue/topic_parser/counters.cpp
+++ b/ydb/library/persqueue/topic_parser/counters.cpp
@@ -43,7 +43,7 @@ TVector<TPQLabelsInfo> GetLabelsForStream(const TTopicConverterPtr& topic, const
{{{"cloud", cloudId}}, {cloudId}},
{{{"folder", folderId}}, {folderId}},
{{{"database", dbId}}, {dbId}},
- {{{"stream", topic->GetClientsideName()}}, {topic->GetClientsideName()}}};
+ {{{"topic", topic->GetClientsideName()}}, {topic->GetClientsideName()}}};
return res;
}
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index 7267c418397..d1b18bf5a2b 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -867,7 +867,9 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con
if (!WaitForData)
return;
- Counters.WaitsForData.Inc();
+ if (Counters.WaitsForData) {
+ Counters.WaitsForData.Inc();
+ }
Y_VERIFY(record.HasEndOffset());
Y_VERIFY(EndOffset <= record.GetEndOffset()); //end offset could not be changed if no data arrived, but signal will be sended anyway after timeout
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 85a879c661f..3df6f988275 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -816,14 +816,13 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId);
const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
- topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name");
- topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name");
- topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name");
- topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name");
- topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name");
- topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name");
- topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name");
- topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name");
+ topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name");
+ topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_released_per_second"}, true, "name");
+ topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_to_be_released_count"}, false, "name");
+ topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_to_be_locked_count"}, false, "name");
+ topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_count"}, false, "name");
+ topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_errors_per_second"}, true, "name");
+ topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.commits_per_second"}, true, "name");
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index c5857901a45..1283a3c2f5c 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -286,8 +286,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
if (SessionsActive) {
SessionsActive.Dec();
- BytesInflight.Dec(BytesInflight_);
- BytesInflightTotal.Dec(BytesInflightTotal_);
+ if (BytesInflight && BytesInflightTotal) {
+ BytesInflight.Dec(BytesInflight_);
+ BytesInflightTotal.Dec(BytesInflightTotal_);
+ }
}
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " is DEAD");
@@ -522,11 +524,9 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& clou
auto subGroup = NPersQueue::GetCountersForStream(Counters);
auto aggr = NPersQueue::GetLabelsForStream(FullConverter, cloudId, dbId, folderId);
- BytesInflight = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding"}, false, "name");
- BytesInflightTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.bytes_proceeding_total"}, false, "name");
- SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_created_per_second"}, true, "name");
- SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.sessions_active"}, false, "name");
- Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"stream.internal_write.errors_per_second"}, true, "name");
+ SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name");
+ SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name");
+ Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.errors_per_second"}, true, "name");
SessionsCreated.Inc();
@@ -1022,8 +1022,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
AcceptedRequests.emplace_back(std::move(writeRequest));
BytesInflight_ -= diff;
- BytesInflight.Dec(diff);
-
+ if (BytesInflight) {
+ BytesInflight.Dec(diff);
+ }
if (!NextRequestInited && BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
NextRequestInited = true;
if (!Request->GetStreamCtx()->Read()) {
@@ -1177,7 +1178,9 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
ui64 diff = writeRequest->ByteSize;
BytesInflightTotal_ -= diff;
- BytesInflightTotal.Dec(diff);
+ if (BytesInflightTotal) {
+ BytesInflightTotal.Dec(diff);
+ }
CheckFinish(ctx);
}
@@ -1276,8 +1279,10 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteReques
diff += request->PartitionWriteRequest->Record.ByteSize();
BytesInflight_ += diff;
BytesInflightTotal_ += diff;
- BytesInflight.Inc(diff);
- BytesInflightTotal.Inc(diff);
+ if (BytesInflight && BytesInflightTotal) {
+ BytesInflight.Inc(diff);
+ BytesInflightTotal.Inc(diff);
+ }
ctx.Send(Writer, std::move(request->PartitionWriteRequest));
SentRequests.push_back(std::move(request));
@@ -1466,8 +1471,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
ui64 diff = ev->Get()->Request.ByteSize();
BytesInflight_ += diff;
BytesInflightTotal_ += diff;
- BytesInflight.Inc(diff);
- BytesInflightTotal.Inc(diff);
+ if (BytesInflight && BytesInflightTotal) {
+ BytesInflight.Inc(diff);
+ BytesInflightTotal.Inc(diff);
+ }
if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
Y_VERIFY(NextRequestInited);
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index fe04c78c367..dbcf9257ddb 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -387,81 +387,76 @@ namespace NKikimr::NPersQueueTests {
checkCounters(monPort,
{
- "stream.internal_read.commits_per_second",
- "stream.internal_read.partitions_errors_per_second",
- "stream.internal_read.partitions_locked",
- "stream.internal_read.partitions_locked_per_second",
- "stream.internal_read.partitions_released_per_second",
- "stream.internal_read.partitions_to_be_locked",
- "stream.internal_read.partitions_to_be_released",
- "stream.internal_read.waits_for_data",
- "stream.internal_write.bytes_proceeding",
- "stream.internal_write.bytes_proceeding_total",
- "stream.internal_write.errors_per_second",
- "stream.internal_write.sessions_active",
- "stream.internal_write.sessions_created_per_second",
+ "api.topic_service.stream_read.commits_per_second",
+ "api.topic_service.stream_read.partitions_errors_per_second",
+ "api.topic_service.stream_read.partitions_locked_count",
+ "api.topic_service.stream_read.partitions_locked_per_second",
+ "api.topic_service.stream_read.partitions_released_per_second",
+ "api.topic_service.stream_read.partitions_to_be_locked_count",
+ "api.topic_service.stream_read.partitions_to_be_released_count",
+ "api.topic_service.stream_write.errors_per_second",
+ "api.topic_service.stream_write.sessions_active_count",
+ "api.topic_service.stream_write.sessions_created_per_second",
},
topicName, "", "", ""
);
checkCounters(monPort,
{
- "stream.internal_read.commits_per_second",
- "stream.internal_read.partitions_errors_per_second",
- "stream.internal_read.partitions_locked",
- "stream.internal_read.partitions_locked_per_second",
- "stream.internal_read.partitions_released_per_second",
- "stream.internal_read.partitions_to_be_locked",
- "stream.internal_read.partitions_to_be_released",
- "stream.internal_read.waits_for_data",
+ "api.topic_service.stream_read.commits_per_second",
+ "api.topic_service.stream_read.partitions_errors_per_second",
+ "api.topic_service.stream_read.partitions_locked_count",
+ "api.topic_service.stream_read.partitions_locked_per_second",
+ "api.topic_service.stream_read.partitions_released_per_second",
+ "api.topic_service.stream_read.partitions_to_be_locked_count",
+ "api.topic_service.stream_read.partitions_to_be_released_count",
},
topicName, consumerName, "", ""
);
checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
{
- "stream.internal_read.time_lags_milliseconds",
- "stream.incoming_bytes_per_second",
- "stream.incoming_records_per_second",
- "stream.internal_write.bytes_per_second",
- "stream.internal_write.compacted_bytes_per_second",
- "stream.internal_write.partition_write_quota_wait_milliseconds",
- "stream.internal_write.record_size_bytes",
- "stream.internal_write.records_per_second",
- "stream.internal_write.time_lags_milliseconds",
- "stream.internal_write.uncompressed_bytes_per_second",
- "stream.await_operating_milliseconds",
- "stream.internal_write.buffer_brimmed_duration_ms",
- "stream.internal_read.bytes_per_second",
- "stream.internal_read.records_per_second",
- "stream.outgoing_bytes_per_second",
- "stream.outgoing_records_per_second",
+ "topic.read_lag_milliseconds",
+ "topic.written_bytes_per_second",
+ "topic.written_messages_per_second",
+ "api.topic_service.stream_write.bytes_per_second",
+ "api.topic_service.stream_write.partition_throttled_milliseconds",
+ "topic.written_message_size_bytes",
+ "api.topic_service.stream_write.messages_per_second",
+ "topic.write_lag_milliseconds",
+ "topic.written_uncompressed_bytes_per_second",
+ "topic.awaiting_consume_milliseconds",
+ "api.topic_service.stream_write.buffer_brimmed_milliseconds",
+ "api.topic_service.stream_read.bytes_per_second",
+ "api.topic_service.stream_read.messages_per_second",
+ "topic.read_bytes_per_second",
+ "topic.read_messages_per_second",
},
topicName, "", "", ""
);
checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
{
- "stream.internal_read.time_lags_milliseconds",
- "stream.await_operating_milliseconds",
- "stream.internal_read.bytes_per_second",
- "stream.internal_read.records_per_second",
- "stream.outgoing_bytes_per_second",
- "stream.outgoing_records_per_second",
+ "topic.read_lag_milliseconds",
+ "topic.awaiting_consume_milliseconds",
+ "api.topic_service.stream_read.bytes_per_second",
+ "api.topic_service.stream_read.messages_per_second",
+ "topic.read_bytes_per_second",
+ "topic.read_messages_per_second",
},
topicName, consumerName, "", ""
);
checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
{
- "stream.await_operating_milliseconds"
+ "topic.awaiting_consume_milliseconds"
},
topicName, consumerName, "1", ""
);
checkCounters(server.CleverServer->GetRuntime()->GetMonPort(),
{
- "stream.internal_write.buffer_brimmed_duration_ms"
+ "api.topic_service.stream_write.buffer_brimmed_milliseconds"
},
topicName, "", "1", ""
);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index c4d2620d22a..f01e29cc704 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -2780,7 +2780,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
"PartitionsReleased",
"PartitionsToBeLocked",
"PartitionsToBeReleased",
- "WaitsForData"
+ "WaitsForData",
},
"", "cluster", "", ""
);