aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-12-08 20:07:47 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-12-08 20:07:47 +0300
commit960c46a34e102e33a8cd99b01068822e91eb36c1 (patch)
tree13827ff34e44873e147abf9f0ff5b19fa33d4d28
parent59d0766faf858740aa743d112c23b9f805041bad (diff)
downloadydb-960c46a34e102e33a8cd99b01068822e91eb36c1.tar.gz
Add database label to PQ metrics
Add database label to PQ metrics
-rw-r--r--ydb/core/http_proxy/custom_metrics.h4
-rw-r--r--ydb/core/persqueue/partition.cpp28
-rw-r--r--ydb/core/persqueue/partition.h4
-rw-r--r--ydb/core/persqueue/pq_impl.cpp7
-rw-r--r--ydb/core/persqueue/pq_impl.h1
-rw-r--r--ydb/core/persqueue/user_info.cpp5
-rw-r--r--ydb/core/persqueue/user_info.h19
-rw-r--r--ydb/core/persqueue/ut/resources/counters_datastreams.html16
-rw-r--r--ydb/core/persqueue/ut/user_action_processor_ut.cpp1
-rw-r--r--ydb/core/sys_view/processor/db_counters.cpp6
-rw-r--r--ydb/core/sys_view/processor/processor_impl.h2
-rw-r--r--ydb/core/sys_view/ut_labeled.cpp5
-rw-r--r--ydb/core/tablet/private/aggregated_counters.cpp1
-rw-r--r--ydb/core/tablet/private/labeled_db_counters.cpp1
-rw-r--r--ydb/core/tablet/private/labeled_db_counters.h1
-rw-r--r--ydb/library/persqueue/tests/counters.cpp2
-rw-r--r--ydb/library/persqueue/tests/counters.h1
-rw-r--r--ydb/library/persqueue/topic_parser/counters.cpp7
-rw-r--r--ydb/library/persqueue/topic_parser/counters.h5
-rw-r--r--ydb/services/lib/actors/type_definitions.h6
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp5
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp8
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp7
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp5
26 files changed, 92 insertions, 59 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h
index 39ddcba5a3..0581f19e11 100644
--- a/ydb/core/http_proxy/custom_metrics.h
+++ b/ydb/core/http_proxy/custom_metrics.h
@@ -18,12 +18,12 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte
TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) {
if (method.empty()) {
- return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId},
+ return { {"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId},
{"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
{"topic", httpContext.StreamName}, {"name", name}};
}
- return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId},
+ return { {"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId},
{"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId},
{"topic", httpContext.StreamName}, {"name", name}};
}
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 40a7b6f535..32b9974441 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -458,7 +458,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
- const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId,
+ const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
bool newPartition)
: TabletID(tabletId)
@@ -480,10 +480,12 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, GapSize(0)
, CloudId(config.GetYcCloudId())
, DbId(config.GetYdbDatabaseId())
+ , DbPath(config.GetYdbDatabasePath())
+ , IsServerless(isServerless)
, FolderId(config.GetYcFolderId())
, UsersInfoStorage(
DCId, TabletID, TopicConverter, Partition, counters, Config,
- CloudId, DbId, config.GetYdbDatabasePath(), FolderId
+ CloudId, DbId, config.GetYdbDatabasePath(), IsServerless, FolderId
)
, ReadingTimestamp(false)
, Cookie(0)
@@ -851,11 +853,13 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
void TPartition::SetupStreamCounters(const TActorContext& ctx) {
const auto topicName = TopicConverter->GetModernName();
auto counters = AppData(ctx)->Counters;
- auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, FolderId);
+ auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId);
WriteBufferIsFullCounter.SetCounter(
- NPersQueue::GetCountersForDataStream(counters),
- {{"cloud_id", CloudId},
+ NPersQueue::GetCountersForTopic(counters, IsServerless),
+ {
+ {"database", DbPath},
+ {"cloud_id", CloudId},
{"folder_id", FolderId},
{"database_id", DbId},
{"topic", TopicConverter->GetFederationPath()},
@@ -864,7 +868,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true});
InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForDataStream(counters), labels,
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
{{"name", "topic.write_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{100, "100"}, {200, "200"}, {500, "500"},
@@ -873,7 +877,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{180'000,"180000"}, {9'999'999, "999999"}}, true));
MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForDataStream(counters), labels,
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
{{"name", "topic.written_message_size_bytes"}}, "bin",
TVector<std::pair<ui64, TString>>{
{1024, "1024"}, {5120, "5120"}, {10'240, "10240"},
@@ -883,17 +887,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{67'108'864, "67108864"}, {999'999'999, "99999999"}}, true));
BytesWritten = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForDataStream(counters), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
{"api.topic_service.stream_write.bytes_per_second",
"topic.written_bytes_per_second"} , true, "name");
MsgsWritten = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForDataStream(counters), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
{"api.topic_service.stream_write.messages_per_second",
"topic.written_messages_per_second"}, true, "name");
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
- NPersQueue::GetCountersForDataStream(counters), labels, {},
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {},
{"topic.written_uncompressed_bytes_per_second"}, true, "name");
TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
@@ -907,7 +911,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForDataStream(counters), labels,
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
{{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
@@ -918,7 +922,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
- NPersQueue::GetCountersForDataStream(counters), labels,
+ NPersQueue::GetCountersForTopic(counters, IsServerless), labels,
{{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{
{0, "0"}, {1, "1"}, {5, "5"}, {10, "10"},
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 98aec8de7b..34c51ce7f8 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -231,7 +231,7 @@ public:
}
TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
- const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId,
+ const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
bool newPartition = false);
@@ -441,6 +441,8 @@ private:
TString CloudId;
TString DbId;
+ TString DbPath;
+ bool IsServerless;
TString FolderId;
TUsersInfoStorage UsersInfoStorage;
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index d78a6cc03b..a5f1c35a79 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -658,7 +658,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
if (Partitions.find(partitionId) == Partitions.end()) {
Partitions.emplace(partitionId, TPartitionInfo(
ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
- IsLocalDC, DCId, Config, *Counters,
+ IsLocalDC, DCId, IsServerless, Config, *Counters,
true)),
GetPartitionKeyRange(partition),
true,
@@ -789,7 +789,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
const auto partitionId = partition.GetPartitionId();
Partitions.emplace(partitionId, TPartitionInfo(
ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
- IsLocalDC, DCId, Config, *Counters,
+ IsLocalDC, DCId, IsServerless, Config, *Counters,
false)),
GetPartitionKeyRange(partition),
false,
@@ -2145,6 +2145,9 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info)
void TPersQueue::CreatedHook(const TActorContext& ctx)
{
+
+ IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
+
ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId()));
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 2b9c9cbe97..c3fdd43337 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -150,6 +150,7 @@ private:
NPersQueue::TTopicConverterPtr TopicConverter;
bool IsLocalDC;
TString DCId;
+ bool IsServerless = false;
TVector<NScheme::TTypeInfo> KeySchema;
NKikimrPQ::TPQTabletConfig Config;
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index 09434b8228..6cdb214c5d 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -38,6 +38,7 @@ TUsersInfoStorage::TUsersInfoStorage(
const TString& cloudId,
const TString& dbId,
const TString& dbPath,
+ const bool isServerless,
const TString& folderId
)
: DCId(std::move(dcId))
@@ -48,6 +49,7 @@ TUsersInfoStorage::TUsersInfoStorage(
, CloudId(cloudId)
, DbId(dbId)
, DbPath(dbPath)
+ , IsServerless(isServerless)
, FolderId(folderId)
, CurReadRuleGeneration(0)
{
@@ -177,11 +179,10 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
bool meterRead = userServiceType.empty() || userServiceType == defaultServiceType;
- TMaybe<TString> dbPath = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() ? TMaybe<TString>(DbPath) : Nothing();
return {
ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition,
- session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, dbPath, FolderId,
+ session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, DbPath, IsServerless, FolderId,
meterRead, burst, speed
};
}
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index d9559779c9..7e936e9940 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -266,7 +266,7 @@ struct TUserInfo {
const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter,
const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset,
const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp,
- const TString& cloudId, const TString& dbId, const TMaybe<TString>& dbPath, const TString& folderId, bool meterRead,
+ const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId, bool meterRead,
ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000
)
: ReadSpeedLimiter(std::move(readSpeedLimiter))
@@ -303,10 +303,10 @@ struct TUserInfo {
if (AppData(ctx)->Counters) {
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
LabeledCounters.Reset(new TUserLabeledCounters(
- user + "||" + topicConverter->GetClientsideName(), partition, *dbPath));
+ user + "||" + topicConverter->GetClientsideName(), partition, dbPath));
if (DoInternalRead) {
- SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId);
+ SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId);
}
} else {
LabeledCounters.Reset(new TUserLabeledCounters(
@@ -320,11 +320,11 @@ struct TUserInfo {
void SetupStreamCounters(
const TActorContext& ctx, const TString& dcId, const TString& partition,
- const TString& cloudId, const TString& dbId, const TString& folderId
+ const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId
) {
- auto subgroup = NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters);
+ auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless);
auto aggregates =
- NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, folderId);
+ NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId);
BytesRead = TMultiCounter(subgroup,
aggregates, {{"consumer", User}},
@@ -336,13 +336,13 @@ struct TUserInfo {
"topic.read_messages_per_second"}, true, "name");
Counter.SetCounter(subgroup,
- {{"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
+ {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId},
{"topic", TopicConverter->GetFederationPath()},
{"consumer", User}, {"host", dcId}, {"partition", partition}},
{"name", "topic.awaiting_consume_milliseconds", true});
ReadTimeLag.reset(new TPercentileCounter(
- NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters), aggregates,
+ NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), aggregates,
{{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin",
TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"},
{1000, "1000"}, {2000, "2000"},
@@ -487,7 +487,7 @@ class TUsersInfoStorage {
public:
TUsersInfoStorage(TString dcId, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition,
const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config,
- const TString& CloudId, const TString& DbId, const TString& DbPath, const TString& FolderId);
+ const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId);
void Init(TActorId tabletActor, TActorId partitionActor);
@@ -542,6 +542,7 @@ private:
TString CloudId;
TString DbId;
TString DbPath;
+ bool IsServerless;
TString FolderId;
mutable ui64 CurReadRuleGeneration;
};
diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html
index 7c39ad45a5..f04c71f0c9 100644
--- a/ydb/core/persqueue/ut/resources/counters_datastreams.html
+++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html
@@ -1,10 +1,10 @@
<pre>
topic=topic:
- name=api.topic_service.stream_write.bytes_per_second: 2700
- name=api.topic_service.stream_write.messages_per_second: 150
- name=topic.written_bytes_per_second: 2700
- name=topic.written_messages_per_second: 150
- name=topic.written_uncompressed_bytes_per_second: 1350
+ name=api.topic_service.stream_write.bytes_per_second: 3240
+ name=api.topic_service.stream_write.messages_per_second: 180
+ name=topic.written_bytes_per_second: 3240
+ name=topic.written_messages_per_second: 180
+ name=topic.written_uncompressed_bytes_per_second: 1620
consumer=user:
name=api.topic_service.stream_read.bytes_per_second: 0
@@ -42,7 +42,7 @@ topic=topic:
name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0
name=api.topic_service.stream_write.partition_throttled_milliseconds:
- bin=0: 150
+ bin=0: 180
bin=1: 0
bin=10: 0
bin=100: 0
@@ -57,7 +57,7 @@ topic=topic:
bin=999999: 0
name=topic.write_lag_milliseconds:
- bin=100: 150
+ bin=100: 180
bin=1000: 0
bin=10000: 0
bin=180000: 0
@@ -70,7 +70,7 @@ topic=topic:
bin=999999: 0
name=topic.written_message_size_bytes:
- bin=1024: 150
+ bin=1024: 180
bin=10240: 0
bin=102400: 0
bin=1048576: 0
diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
index 95ad7cc2dc..5ea3c4b821 100644
--- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp
+++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
@@ -162,6 +162,7 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio
topicConverter,
true,
"dcId",
+ false,
config,
*tabletCounters,
newPartition);
diff --git a/ydb/core/sys_view/processor/db_counters.cpp b/ydb/core/sys_view/processor/db_counters.cpp
index c941b875ad..ba6d9212bd 100644
--- a/ydb/core/sys_view/processor/db_counters.cpp
+++ b/ydb/core/sys_view/processor/db_counters.cpp
@@ -240,6 +240,7 @@ static void SwapStatefulCounters(NKikimr::NSysView::TDbServiceCounters* dst,
SwapSimpleCounters(dstReq->MutableRequestCounters(), *srcReq.MutableRequestCounters());
}
+ dst->ClearLabeledCounters();
for (auto& srcReq : *src.MutableLabeledCounters()) {
auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup());
SwapLabeledCounters(dstReq->MutableAggregatedPerTablets(), *srcReq.MutableAggregatedPerTablets());
@@ -339,7 +340,7 @@ void TSysViewProcessor::AttachExternalCounters() {
->GetSubgroup("database_id", DatabaseId)
->RegisterSubgroup("host", "", ExternalGroup);
- GetServiceCounters(AppData()->Counters, "labeled", false)
+ GetServiceCounters(AppData()->Counters, "labeled_serverless", false)
->GetSubgroup("database", Database)
->GetSubgroup("cloud_id", CloudId)
->GetSubgroup("folder_id", FolderId)
@@ -367,7 +368,7 @@ void TSysViewProcessor::DetachExternalCounters() {
GetServiceCounters(AppData()->Counters, "ydb_serverless", false)
->RemoveSubgroup("database", Database);
- GetServiceCounters(AppData()->Counters, "labeled", false)
+ GetServiceCounters(AppData()->Counters, "labeled_severless", false)
->RemoveSubgroup("database", Database);
}
@@ -472,7 +473,6 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr
incomingServicesSet.insert(service);
auto& simpleState = state.Simple[service];
- simpleState.ClearLabeledCounters();
SwapStatefulCounters(&simpleState, *serviceCounters.MutableCounters());
}
diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h
index 904f545ad8..c781ed0f21 100644
--- a/ydb/core/sys_view/processor/processor_impl.h
+++ b/ydb/core/sys_view/processor/processor_impl.h
@@ -274,7 +274,7 @@ private:
// interval of db counters processing
static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5);
// interval of db labeled counters processing
- static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1);
+ static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Seconds(60);
// interval of sending next navigate request
static constexpr TDuration SendNavigateInterval = TDuration::Seconds(5);
diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp
index 2ef24fdd2d..53093bc7ff 100644
--- a/ydb/core/sys_view/ut_labeled.cpp
+++ b/ydb/core/sys_view/ut_labeled.cpp
@@ -36,6 +36,7 @@ void CreateDatabase(TTestEnv& env, const TString& databaseName) {
bool CheckCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue,
bool isDerivative) {
auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val();
+ Cerr << "CHECK COUNTER " << sensorName << " wait " << refValue << " got " << value << "\n";
return (value == refValue);
}
@@ -77,7 +78,7 @@ void GetCounters(TTestEnv& env, const TString& databaseName, const TString& data
for (ui32 nodeId = 0; nodeId < env.GetServer().GetRuntime()->GetNodeCount(); ++nodeId) {
auto counters = env.GetServer().GetRuntime()->GetAppData(nodeId).Counters;
- auto labeledGroup = GetServiceCounters(counters, "labeled", false);
+ auto labeledGroup = GetServiceCounters(counters, "labeled_serverless", false);
Y_VERIFY(labeledGroup);
auto databaseGroup = labeledGroup->FindSubgroup("database", databasePath);
@@ -231,7 +232,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) {
return isGood;
};
- Sleep(TDuration::Minutes(1));
+ Sleep(TDuration::Seconds(30));
GetCounters(env, databaseName, databasePath, check);
}
}
diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp
index 66ed751770..0416117475 100644
--- a/ydb/core/tablet/private/aggregated_counters.cpp
+++ b/ydb/core/tablet/private/aggregated_counters.cpp
@@ -642,6 +642,7 @@ void TAggregatedLabeledCounters::Recalc(ui32 idx) const {
Y_FAIL("bad aggrFunc value");
};
}
+
AggrCounters[idx] = aggrVal.first;
Ids[idx] = aggrVal.second;
}
diff --git a/ydb/core/tablet/private/labeled_db_counters.cpp b/ydb/core/tablet/private/labeled_db_counters.cpp
index ddf01f5682..f24ccfba7b 100644
--- a/ydb/core/tablet/private/labeled_db_counters.cpp
+++ b/ydb/core/tablet/private/labeled_db_counters.cpp
@@ -101,6 +101,7 @@ void TDbLabeledCounters::FromProto(NKikimr::NSysView::TDbServiceCounters& counte
// that's why we iterate the group in reverse order
// this comes from: ydb/core/persqueue/user_info.h:310 (TUserInfo::TUserInfo)
std::reverse(groups.begin(), groups.end());
+
for (size_t i = 0; i < groups.size(); ++i) {
if (i != 1) {
countersGroup = countersGroup->GetSubgroup(groupNames[i], groups[i]);
diff --git a/ydb/core/tablet/private/labeled_db_counters.h b/ydb/core/tablet/private/labeled_db_counters.h
index 13808848b0..8359381c73 100644
--- a/ydb/core/tablet/private/labeled_db_counters.h
+++ b/ydb/core/tablet/private/labeled_db_counters.h
@@ -9,6 +9,7 @@
namespace NKikimr::NPrivate {
+
class TPQCounters : public ILabeledCounters {
protected:
TConcurrentRWHashMap<TString, TAutoPtr<TAggregatedLabeledCounters>, 256> LabeledCountersByGroup;
diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp
index 2cddb6a157..ac600b0219 100644
--- a/ydb/library/persqueue/tests/counters.cpp
+++ b/ydb/library/persqueue/tests/counters.cpp
@@ -87,6 +87,7 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co
}
NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
+ const TString& databasePath,
const TString& cloudId, const TString& databaseId,
const TString& folderId, const TString& topicName,
const TString& consumer, const TString& host,
@@ -96,6 +97,7 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
TStringBuilder queryBuilder;
queryBuilder <<
"/counters/counters=" << counters <<
+ "/database=" << databasePath <<
"/cloud_id=" << cloudId <<
"/folder_id=" << folderId <<
"/database_id=" << databaseId <<
diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h
index 588215b7ae..35f211491a 100644
--- a/ydb/library/persqueue/tests/counters.h
+++ b/ydb/library/persqueue/tests/counters.h
@@ -16,6 +16,7 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co
const TString& client, const TString& consumerPath);
NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters,
+ const TString& databasePath,
const TString& cloudId, const TString& databaseId,
const TString& folderId, const TString& topicName,
const TString& consumer, const TString& host,
diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp
index ded7dbf7a8..5f78745bde 100644
--- a/ydb/library/persqueue/topic_parser/counters.cpp
+++ b/ydb/library/persqueue/topic_parser/counters.cpp
@@ -15,9 +15,9 @@ namespace NPersQueue {
->GetSubgroup("Topic", topic->GetShortClientsideName());
}
-::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters)
+::NMonitoring::TDynamicCounterPtr GetCountersForTopic(::NMonitoring::TDynamicCounterPtr counters, bool isServerless)
{
- return counters->GetSubgroup("counters", "datastreams");
+ return counters->GetSubgroup("counters", isServerless ? "datastreams_serverless" : "datastreams");
}
TVector<TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster)
@@ -38,8 +38,9 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic)
}
TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
- const TString& dbId, const TString& folderId) {
+ const TString& dbId, const TString& dbPath, const TString& folderId) {
TVector<TPQLabelsInfo> res = {
+ {{{"database", dbPath}}, {dbPath}},
{{{"cloud_id", cloudId}}, {cloudId}},
{{{"folder_id", folderId}}, {folderId}},
{{{"database_id", dbId}}, {dbId}},
diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h
index 435fb61d88..5457287521 100644
--- a/ydb/library/persqueue/topic_parser/counters.h
+++ b/ydb/library/persqueue/topic_parser/counters.h
@@ -10,10 +10,11 @@ TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic);
//TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic);
TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster);
TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId,
- const TString& dbId, const TString& folderId);
+ const TString& dbId, const TString& dbPath,
+ const TString& folderId);
::NMonitoring::TDynamicCounterPtr GetCounters(::NMonitoring::TDynamicCounterPtr counters,
const TString& subsystem,
const TTopicConverterPtr& topic);
-::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters);
+::NMonitoring::TDynamicCounterPtr GetCountersForTopic(::NMonitoring::TDynamicCounterPtr counters, bool isServerless);
} // namespace NPersQueue
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index fbd7622b85..9db7e6e686 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -16,6 +16,8 @@ struct TTopicInitInfo {
ui64 TabletID;
TString CloudId;
TString DbId;
+ TString DbPath;
+ bool IsServerless = false;
TString FolderId;
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
};
@@ -28,6 +30,8 @@ struct TTopicHolder {
bool ACLRequestInfly = false;
TString CloudId;
TString DbId;
+ TString DbPath;
+ bool IsServerless;
TString FolderId;
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
@@ -43,6 +47,8 @@ struct TTopicHolder {
.ACLRequestInfly = false,
.CloudId = info.CloudId,
.DbId = info.DbId,
+ .DbPath = info.DbPath,
+ .IsServerless = info.IsServerless,
.FolderId = info.FolderId,
.MeteringMode = info.MeteringMode,
.FullConverter = info.TopicNameConverter,
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index 34e6522250..587dcf9042 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -98,6 +98,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId();
topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId();
topicsIter->second.MeteringMode = pqDescr.GetPQTabletConfig().GetMeteringMode();
+ topicsIter->second.DbPath = pqDescr.GetPQTabletConfig().GetYdbDatabasePath();
+ topicsIter->second.IsServerless = entry.DomainInfo->IsServerless();
+
if (!topicsIter->second.DiscoveryConverter->IsValid()) {
TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503",
topicsIter->second.DiscoveryConverter->GetPrintableString().c_str());
@@ -262,7 +265,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {
TTopicInitInfoMap res;
for (auto& [name, holder] : Topics) {
res.insert(std::make_pair(name, TTopicInitInfo{
- holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId, holder.MeteringMode
+ holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode
}));
}
ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res)));
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h
index 857a83d6c7..8dde6b616f 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.h
@@ -280,7 +280,7 @@ private:
void SetupCounters();
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic);
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic,
- const TString& cloudId, const TString& dbId, const TString& folderId);
+ const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);
void ProcessReads(const TActorContext& ctx);
ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 525678c3d7..7fb6f792ce 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -809,11 +809,11 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic,
- const TString& cloudId, const TString& dbId, const TString& folderId)
+ const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId)
{
auto& topicCounters = TopicCounters[topic->GetInternalName()];
- auto subGroup = NPersQueue::GetCountersForDataStream(Counters);
- auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, folderId);
+ auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless);
+ auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, dbPath, folderId);
const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name");
@@ -1003,7 +1003,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit
// TODO: counters
if (NumPartitionsFromTopic[name]++ == 0) {
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
- SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.FolderId);
+ SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.DbPath, it->second.IsServerless, it->second.FolderId);
} else {
SetupTopicCounters(converterIter->second);
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index b148d9cf7c..0802f35932 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -184,7 +184,7 @@ private:
void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
void SetupCounters();
- void SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId);
+ void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);
private:
std::unique_ptr<TEvStreamWriteRequest> Request;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 4387a001f8..9f78798300 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -514,15 +514,15 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters()
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId)
+void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId)
{
if (SessionsCreated) {
return;
}
//now topic is checked, can create group for real topic, not garbage
- auto subGroup = NPersQueue::GetCountersForDataStream(Counters);
- auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, folderId);
+ auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless);
+ auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, dbPath, folderId);
SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name");
SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name");
@@ -582,6 +582,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
const auto& tabletConfig = description.GetPQTabletConfig();
SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(),
+ tabletConfig.GetYdbDatabasePath(), entry.DomainInfo->IsServerless(),
tabletConfig.GetYcFolderId());
} else {
SetupCounters();
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index dbcf9257dd..9a2a361708 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -358,8 +358,9 @@ namespace NKikimr::NPersQueueTests {
const std::set<std::string>& canonicalSensorNames,
const TString& stream, const TString& consumer,
const TString& host, const TString& shard) {
- auto counters = GetCounters1stClass(monPort, "datastreams", cloudId, databaseId,
- folderId, stream, consumer, host, shard);
+ auto counters = GetCounters1stClass(monPort, "datastreams", "%2FRoot", cloudId,
+ databaseId, folderId, stream, consumer, host,
+ shard);
const auto sensors = counters["sensors"].GetArray();
std::set<std::string> sensorNames;
std::transform(sensors.begin(), sensors.end(),