diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-12-08 20:07:47 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-12-08 20:07:47 +0300 |
commit | 960c46a34e102e33a8cd99b01068822e91eb36c1 (patch) | |
tree | 13827ff34e44873e147abf9f0ff5b19fa33d4d28 | |
parent | 59d0766faf858740aa743d112c23b9f805041bad (diff) | |
download | ydb-960c46a34e102e33a8cd99b01068822e91eb36c1.tar.gz |
Add database label to PQ metrics
Add database label to PQ metrics
26 files changed, 92 insertions, 59 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h index 39ddcba5a3..0581f19e11 100644 --- a/ydb/core/http_proxy/custom_metrics.h +++ b/ydb/core/http_proxy/custom_metrics.h @@ -18,12 +18,12 @@ void FillOutputCustomMetrics(const TProtoResult& result, const THttpRequestConte TVector<std::pair<TString, TString>> BuildLabels(const TString& method, const THttpRequestContext& httpContext, const TString& name) { if (method.empty()) { - return {{"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId}, + return { {"database", httpContext.DatabaseName}, {"cloud_id", httpContext.CloudId}, {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, {"topic", httpContext.StreamName}, {"name", name}}; } - return {{"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId}, + return { {"database", httpContext.DatabaseName}, {"method", method}, {"cloud_id", httpContext.CloudId}, {"folder_id", httpContext.FolderId}, {"database_id", httpContext.DatabaseId}, {"topic", httpContext.StreamName}, {"name", name}}; } diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 40a7b6f535..32b9974441 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -458,7 +458,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config } TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, - const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, + const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool newPartition) : TabletID(tabletId) @@ -480,10 +480,12 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , GapSize(0) , CloudId(config.GetYcCloudId()) , DbId(config.GetYdbDatabaseId()) + , DbPath(config.GetYdbDatabasePath()) + , IsServerless(isServerless) , FolderId(config.GetYcFolderId()) , UsersInfoStorage( DCId, TabletID, TopicConverter, Partition, counters, Config, - CloudId, DbId, config.GetYdbDatabasePath(), FolderId + CloudId, DbId, config.GetYdbDatabasePath(), IsServerless, FolderId ) , ReadingTimestamp(false) , Cookie(0) @@ -851,11 +853,13 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { void TPartition::SetupStreamCounters(const TActorContext& ctx) { const auto topicName = TopicConverter->GetModernName(); auto counters = AppData(ctx)->Counters; - auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, FolderId); + auto labels = NPersQueue::GetLabelsForTopic(TopicConverter, CloudId, DbId, DbPath, FolderId); WriteBufferIsFullCounter.SetCounter( - NPersQueue::GetCountersForDataStream(counters), - {{"cloud_id", CloudId}, + NPersQueue::GetCountersForTopic(counters, IsServerless), + { + {"database", DbPath}, + {"cloud_id", CloudId}, {"folder_id", FolderId}, {"database_id", DbId}, {"topic", TopicConverter->GetFederationPath()}, @@ -864,7 +868,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {"name", "api.topic_service.stream_write.buffer_brimmed_milliseconds", true}); InputTimeLag = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForDataStream(counters), labels, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {{"name", "topic.write_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {100, "100"}, {200, "200"}, {500, "500"}, @@ -873,7 +877,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {180'000,"180000"}, {9'999'999, "999999"}}, true)); MessageSize = THolder<NKikimr::NPQ::TPercentileCounter>(new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForDataStream(counters), labels, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {{"name", "topic.written_message_size_bytes"}}, "bin", TVector<std::pair<ui64, TString>>{ {1024, "1024"}, {5120, "5120"}, {10'240, "10240"}, @@ -883,17 +887,17 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { {67'108'864, "67108864"}, {999'999'999, "99999999"}}, true)); BytesWritten = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForDataStream(counters), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, {"api.topic_service.stream_write.bytes_per_second", "topic.written_bytes_per_second"} , true, "name"); MsgsWritten = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForDataStream(counters), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, {"api.topic_service.stream_write.messages_per_second", "topic.written_messages_per_second"}, true, "name"); BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter( - NPersQueue::GetCountersForDataStream(counters), labels, {}, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {}, {"topic.written_uncompressed_bytes_per_second"}, true, "name"); TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; @@ -907,7 +911,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForDataStream(counters), labels, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {{"name", "api.topic_service.stream_write.topic_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, @@ -918,7 +922,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>( new NKikimr::NPQ::TPercentileCounter( - NPersQueue::GetCountersForDataStream(counters), labels, + NPersQueue::GetCountersForTopic(counters, IsServerless), labels, {{"name", "api.topic_service.stream_write.partition_throttled_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{ {0, "0"}, {1, "1"}, {5, "5"}, {10, "10"}, diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 98aec8de7b..34c51ce7f8 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -231,7 +231,7 @@ public: } TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, - const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, + const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool newPartition = false); @@ -441,6 +441,8 @@ private: TString CloudId; TString DbId; + TString DbPath; + bool IsServerless; TString FolderId; TUsersInfoStorage UsersInfoStorage; diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index d78a6cc03b..a5f1c35a79 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -658,7 +658,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) if (Partitions.find(partitionId) == Partitions.end()) { Partitions.emplace(partitionId, TPartitionInfo( ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter, - IsLocalDC, DCId, Config, *Counters, + IsLocalDC, DCId, IsServerless, Config, *Counters, true)), GetPartitionKeyRange(partition), true, @@ -789,7 +789,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& const auto partitionId = partition.GetPartitionId(); Partitions.emplace(partitionId, TPartitionInfo( ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter, - IsLocalDC, DCId, Config, *Counters, + IsLocalDC, DCId, IsServerless, Config, *Counters, false)), GetPartitionKeyRange(partition), false, @@ -2145,6 +2145,9 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info) void TPersQueue::CreatedHook(const TActorContext& ctx) { + + IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe + ctx.Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(ctx.SelfID.NodeId())); } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 2b9c9cbe97..c3fdd43337 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -150,6 +150,7 @@ private: NPersQueue::TTopicConverterPtr TopicConverter; bool IsLocalDC; TString DCId; + bool IsServerless = false; TVector<NScheme::TTypeInfo> KeySchema; NKikimrPQ::TPQTabletConfig Config; diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 09434b8228..6cdb214c5d 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -38,6 +38,7 @@ TUsersInfoStorage::TUsersInfoStorage( const TString& cloudId, const TString& dbId, const TString& dbPath, + const bool isServerless, const TString& folderId ) : DCId(std::move(dcId)) @@ -48,6 +49,7 @@ TUsersInfoStorage::TUsersInfoStorage( , CloudId(cloudId) , DbId(dbId) , DbPath(dbPath) + , IsServerless(isServerless) , FolderId(folderId) , CurReadRuleGeneration(0) { @@ -177,11 +179,10 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, bool meterRead = userServiceType.empty() || userServiceType == defaultServiceType; - TMaybe<TString> dbPath = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() ? TMaybe<TString>(DbPath) : Nothing(); return { ctx, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition, - session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, dbPath, FolderId, + session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, CloudId, DbId, DbPath, IsServerless, FolderId, meterRead, burst, speed }; } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index d9559779c9..7e936e9940 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -266,7 +266,7 @@ struct TUserInfo { const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter, const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset, const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp, - const TString& cloudId, const TString& dbId, const TMaybe<TString>& dbPath, const TString& folderId, bool meterRead, + const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId, bool meterRead, ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000 ) : ReadSpeedLimiter(std::move(readSpeedLimiter)) @@ -303,10 +303,10 @@ struct TUserInfo { if (AppData(ctx)->Counters) { if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { LabeledCounters.Reset(new TUserLabeledCounters( - user + "||" + topicConverter->GetClientsideName(), partition, *dbPath)); + user + "||" + topicConverter->GetClientsideName(), partition, dbPath)); if (DoInternalRead) { - SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId); + SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, dbPath, isServerless, folderId); } } else { LabeledCounters.Reset(new TUserLabeledCounters( @@ -320,11 +320,11 @@ struct TUserInfo { void SetupStreamCounters( const TActorContext& ctx, const TString& dcId, const TString& partition, - const TString& cloudId, const TString& dbId, const TString& folderId + const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId ) { - auto subgroup = NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters); + auto subgroup = NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless); auto aggregates = - NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, folderId); + NPersQueue::GetLabelsForTopic(TopicConverter, cloudId, dbId, dbPath, folderId); BytesRead = TMultiCounter(subgroup, aggregates, {{"consumer", User}}, @@ -336,13 +336,13 @@ struct TUserInfo { "topic.read_messages_per_second"}, true, "name"); Counter.SetCounter(subgroup, - {{"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, + {{"database", dbPath}, {"cloud_id", cloudId}, {"folder_id", folderId}, {"database_id", dbId}, {"topic", TopicConverter->GetFederationPath()}, {"consumer", User}, {"host", dcId}, {"partition", partition}}, {"name", "topic.awaiting_consume_milliseconds", true}); ReadTimeLag.reset(new TPercentileCounter( - NPersQueue::GetCountersForDataStream(AppData(ctx)->Counters), aggregates, + NPersQueue::GetCountersForTopic(AppData(ctx)->Counters, isServerless), aggregates, {{"consumer", User}, {"name", "topic.read_lag_milliseconds"}}, "bin", TVector<std::pair<ui64, TString>>{{100, "100"}, {200, "200"}, {500, "500"}, {1000, "1000"}, {2000, "2000"}, @@ -487,7 +487,7 @@ class TUsersInfoStorage { public: TUsersInfoStorage(TString dcId, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition, const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config, - const TString& CloudId, const TString& DbId, const TString& DbPath, const TString& FolderId); + const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId); void Init(TActorId tabletActor, TActorId partitionActor); @@ -542,6 +542,7 @@ private: TString CloudId; TString DbId; TString DbPath; + bool IsServerless; TString FolderId; mutable ui64 CurReadRuleGeneration; }; diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index 7c39ad45a5..f04c71f0c9 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -1,10 +1,10 @@ <pre> topic=topic: - name=api.topic_service.stream_write.bytes_per_second: 2700 - name=api.topic_service.stream_write.messages_per_second: 150 - name=topic.written_bytes_per_second: 2700 - name=topic.written_messages_per_second: 150 - name=topic.written_uncompressed_bytes_per_second: 1350 + name=api.topic_service.stream_write.bytes_per_second: 3240 + name=api.topic_service.stream_write.messages_per_second: 180 + name=topic.written_bytes_per_second: 3240 + name=topic.written_messages_per_second: 180 + name=topic.written_uncompressed_bytes_per_second: 1620 consumer=user: name=api.topic_service.stream_read.bytes_per_second: 0 @@ -42,7 +42,7 @@ topic=topic: name=api.topic_service.stream_write.buffer_brimmed_milliseconds: 0 name=api.topic_service.stream_write.partition_throttled_milliseconds: - bin=0: 150 + bin=0: 180 bin=1: 0 bin=10: 0 bin=100: 0 @@ -57,7 +57,7 @@ topic=topic: bin=999999: 0 name=topic.write_lag_milliseconds: - bin=100: 150 + bin=100: 180 bin=1000: 0 bin=10000: 0 bin=180000: 0 @@ -70,7 +70,7 @@ topic=topic: bin=999999: 0 name=topic.written_message_size_bytes: - bin=1024: 150 + bin=1024: 180 bin=10240: 0 bin=102400: 0 bin=1048576: 0 diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp index 95ad7cc2dc..5ea3c4b821 100644 --- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp +++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp @@ -162,6 +162,7 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio topicConverter, true, "dcId", + false, config, *tabletCounters, newPartition); diff --git a/ydb/core/sys_view/processor/db_counters.cpp b/ydb/core/sys_view/processor/db_counters.cpp index c941b875ad..ba6d9212bd 100644 --- a/ydb/core/sys_view/processor/db_counters.cpp +++ b/ydb/core/sys_view/processor/db_counters.cpp @@ -240,6 +240,7 @@ static void SwapStatefulCounters(NKikimr::NSysView::TDbServiceCounters* dst, SwapSimpleCounters(dstReq->MutableRequestCounters(), *srcReq.MutableRequestCounters()); } + dst->ClearLabeledCounters(); for (auto& srcReq : *src.MutableLabeledCounters()) { auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup()); SwapLabeledCounters(dstReq->MutableAggregatedPerTablets(), *srcReq.MutableAggregatedPerTablets()); @@ -339,7 +340,7 @@ void TSysViewProcessor::AttachExternalCounters() { ->GetSubgroup("database_id", DatabaseId) ->RegisterSubgroup("host", "", ExternalGroup); - GetServiceCounters(AppData()->Counters, "labeled", false) + GetServiceCounters(AppData()->Counters, "labeled_serverless", false) ->GetSubgroup("database", Database) ->GetSubgroup("cloud_id", CloudId) ->GetSubgroup("folder_id", FolderId) @@ -367,7 +368,7 @@ void TSysViewProcessor::DetachExternalCounters() { GetServiceCounters(AppData()->Counters, "ydb_serverless", false) ->RemoveSubgroup("database", Database); - GetServiceCounters(AppData()->Counters, "labeled", false) + GetServiceCounters(AppData()->Counters, "labeled_severless", false) ->RemoveSubgroup("database", Database); } @@ -472,7 +473,6 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr incomingServicesSet.insert(service); auto& simpleState = state.Simple[service]; - simpleState.ClearLabeledCounters(); SwapStatefulCounters(&simpleState, *serviceCounters.MutableCounters()); } diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h index 904f545ad8..c781ed0f21 100644 --- a/ydb/core/sys_view/processor/processor_impl.h +++ b/ydb/core/sys_view/processor/processor_impl.h @@ -274,7 +274,7 @@ private: // interval of db counters processing static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5); // interval of db labeled counters processing - static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1); + static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Seconds(60); // interval of sending next navigate request static constexpr TDuration SendNavigateInterval = TDuration::Seconds(5); diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp index 2ef24fdd2d..53093bc7ff 100644 --- a/ydb/core/sys_view/ut_labeled.cpp +++ b/ydb/core/sys_view/ut_labeled.cpp @@ -36,6 +36,7 @@ void CreateDatabase(TTestEnv& env, const TString& databaseName) { bool CheckCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue, bool isDerivative) { auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val(); + Cerr << "CHECK COUNTER " << sensorName << " wait " << refValue << " got " << value << "\n"; return (value == refValue); } @@ -77,7 +78,7 @@ void GetCounters(TTestEnv& env, const TString& databaseName, const TString& data for (ui32 nodeId = 0; nodeId < env.GetServer().GetRuntime()->GetNodeCount(); ++nodeId) { auto counters = env.GetServer().GetRuntime()->GetAppData(nodeId).Counters; - auto labeledGroup = GetServiceCounters(counters, "labeled", false); + auto labeledGroup = GetServiceCounters(counters, "labeled_serverless", false); Y_VERIFY(labeledGroup); auto databaseGroup = labeledGroup->FindSubgroup("database", databasePath); @@ -231,7 +232,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { return isGood; }; - Sleep(TDuration::Minutes(1)); + Sleep(TDuration::Seconds(30)); GetCounters(env, databaseName, databasePath, check); } } diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp index 66ed751770..0416117475 100644 --- a/ydb/core/tablet/private/aggregated_counters.cpp +++ b/ydb/core/tablet/private/aggregated_counters.cpp @@ -642,6 +642,7 @@ void TAggregatedLabeledCounters::Recalc(ui32 idx) const { Y_FAIL("bad aggrFunc value"); }; } + AggrCounters[idx] = aggrVal.first; Ids[idx] = aggrVal.second; } diff --git a/ydb/core/tablet/private/labeled_db_counters.cpp b/ydb/core/tablet/private/labeled_db_counters.cpp index ddf01f5682..f24ccfba7b 100644 --- a/ydb/core/tablet/private/labeled_db_counters.cpp +++ b/ydb/core/tablet/private/labeled_db_counters.cpp @@ -101,6 +101,7 @@ void TDbLabeledCounters::FromProto(NKikimr::NSysView::TDbServiceCounters& counte // that's why we iterate the group in reverse order // this comes from: ydb/core/persqueue/user_info.h:310 (TUserInfo::TUserInfo) std::reverse(groups.begin(), groups.end()); + for (size_t i = 0; i < groups.size(); ++i) { if (i != 1) { countersGroup = countersGroup->GetSubgroup(groupNames[i], groups[i]); diff --git a/ydb/core/tablet/private/labeled_db_counters.h b/ydb/core/tablet/private/labeled_db_counters.h index 13808848b0..8359381c73 100644 --- a/ydb/core/tablet/private/labeled_db_counters.h +++ b/ydb/core/tablet/private/labeled_db_counters.h @@ -9,6 +9,7 @@ namespace NKikimr::NPrivate { + class TPQCounters : public ILabeledCounters { protected: TConcurrentRWHashMap<TString, TAutoPtr<TAggregatedLabeledCounters>, 256> LabeledCountersByGroup; diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp index 2cddb6a157..ac600b0219 100644 --- a/ydb/library/persqueue/tests/counters.cpp +++ b/ydb/library/persqueue/tests/counters.cpp @@ -87,6 +87,7 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co } NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, + const TString& databasePath, const TString& cloudId, const TString& databaseId, const TString& folderId, const TString& topicName, const TString& consumer, const TString& host, @@ -96,6 +97,7 @@ NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, TStringBuilder queryBuilder; queryBuilder << "/counters/counters=" << counters << + "/database=" << databasePath << "/cloud_id=" << cloudId << "/folder_id=" << folderId << "/database_id=" << databaseId << diff --git a/ydb/library/persqueue/tests/counters.h b/ydb/library/persqueue/tests/counters.h index 588215b7ae..35f211491a 100644 --- a/ydb/library/persqueue/tests/counters.h +++ b/ydb/library/persqueue/tests/counters.h @@ -16,6 +16,7 @@ NJson::TJsonValue GetClientCountersLegacy(ui16 port, const TString& counters, co const TString& client, const TString& consumerPath); NJson::TJsonValue GetCounters1stClass(ui16 port, const TString& counters, + const TString& databasePath, const TString& cloudId, const TString& databaseId, const TString& folderId, const TString& topicName, const TString& consumer, const TString& host, diff --git a/ydb/library/persqueue/topic_parser/counters.cpp b/ydb/library/persqueue/topic_parser/counters.cpp index ded7dbf7a8..5f78745bde 100644 --- a/ydb/library/persqueue/topic_parser/counters.cpp +++ b/ydb/library/persqueue/topic_parser/counters.cpp @@ -15,9 +15,9 @@ namespace NPersQueue { ->GetSubgroup("Topic", topic->GetShortClientsideName()); } -::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters) +::NMonitoring::TDynamicCounterPtr GetCountersForTopic(::NMonitoring::TDynamicCounterPtr counters, bool isServerless) { - return counters->GetSubgroup("counters", "datastreams"); + return counters->GetSubgroup("counters", isServerless ? "datastreams_serverless" : "datastreams"); } TVector<TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster) @@ -38,8 +38,9 @@ TVector<TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic) } TVector<TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, - const TString& dbId, const TString& folderId) { + const TString& dbId, const TString& dbPath, const TString& folderId) { TVector<TPQLabelsInfo> res = { + {{{"database", dbPath}}, {dbPath}}, {{{"cloud_id", cloudId}}, {cloudId}}, {{{"folder_id", folderId}}, {folderId}}, {{{"database_id", dbId}}, {dbId}}, diff --git a/ydb/library/persqueue/topic_parser/counters.h b/ydb/library/persqueue/topic_parser/counters.h index 435fb61d88..5457287521 100644 --- a/ydb/library/persqueue/topic_parser/counters.h +++ b/ydb/library/persqueue/topic_parser/counters.h @@ -10,10 +10,11 @@ TVector<NPersQueue::TPQLabelsInfo> GetLabels(const TTopicConverterPtr& topic); //TVector<NPersQueue::TPQLabelsInfo> GetLabelsForLegacyName(const TString& topic); TVector<NPersQueue::TPQLabelsInfo> GetLabelsForCustomCluster(const TTopicConverterPtr& topic, TString cluster); TVector<NPersQueue::TPQLabelsInfo> GetLabelsForTopic(const TTopicConverterPtr& topic, const TString& cloudId, - const TString& dbId, const TString& folderId); + const TString& dbId, const TString& dbPath, + const TString& folderId); ::NMonitoring::TDynamicCounterPtr GetCounters(::NMonitoring::TDynamicCounterPtr counters, const TString& subsystem, const TTopicConverterPtr& topic); -::NMonitoring::TDynamicCounterPtr GetCountersForDataStream(::NMonitoring::TDynamicCounterPtr counters); +::NMonitoring::TDynamicCounterPtr GetCountersForTopic(::NMonitoring::TDynamicCounterPtr counters, bool isServerless); } // namespace NPersQueue diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index fbd7622b85..9db7e6e686 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -16,6 +16,8 @@ struct TTopicInitInfo { ui64 TabletID; TString CloudId; TString DbId; + TString DbPath; + bool IsServerless = false; TString FolderId; NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; }; @@ -28,6 +30,8 @@ struct TTopicHolder { bool ACLRequestInfly = false; TString CloudId; TString DbId; + TString DbPath; + bool IsServerless; TString FolderId; NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; @@ -43,6 +47,8 @@ struct TTopicHolder { .ACLRequestInfly = false, .CloudId = info.CloudId, .DbId = info.DbId, + .DbPath = info.DbPath, + .IsServerless = info.IsServerless, .FolderId = info.FolderId, .MeteringMode = info.MeteringMode, .FullConverter = info.TopicNameConverter, diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 34e6522250..587dcf9042 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -98,6 +98,9 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId(); topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId(); topicsIter->second.MeteringMode = pqDescr.GetPQTabletConfig().GetMeteringMode(); + topicsIter->second.DbPath = pqDescr.GetPQTabletConfig().GetYdbDatabasePath(); + topicsIter->second.IsServerless = entry.DomainInfo->IsServerless(); + if (!topicsIter->second.DiscoveryConverter->IsValid()) { TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", topicsIter->second.DiscoveryConverter->GetPrintableString().c_str()); @@ -262,7 +265,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { TTopicInitInfoMap res; for (auto& [name, holder] : Topics) { res.insert(std::make_pair(name, TTopicInitInfo{ - holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId, holder.MeteringMode + holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.DbPath, holder.IsServerless, holder.FolderId, holder.MeteringMode })); } ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res))); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index 857a83d6c7..8dde6b616f 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -280,7 +280,7 @@ private: void SetupCounters(); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, - const TString& cloudId, const TString& dbId, const TString& folderId); + const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); void ProcessReads(const TActorContext& ctx); ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 525678c3d7..7fb6f792ce 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -809,11 +809,11 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, - const TString& cloudId, const TString& dbId, const TString& folderId) + const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId) { auto& topicCounters = TopicCounters[topic->GetInternalName()]; - auto subGroup = NPersQueue::GetCountersForDataStream(Counters); - auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, folderId); + auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless); + auto aggr = NPersQueue::GetLabelsForTopic(topic, cloudId, dbId, dbPath, folderId); const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"api.topic_service.stream_read.partitions_locked_per_second"}, true, "name"); @@ -1003,7 +1003,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartit // TODO: counters if (NumPartitionsFromTopic[name]++ == 0) { if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { - SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.FolderId); + SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.DbPath, it->second.IsServerless, it->second.FolderId); } else { SetupTopicCounters(converterIter->second); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index b148d9cf7c..0802f35932 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -184,7 +184,7 @@ private: void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); void SetupCounters(); - void SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId); + void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); private: std::unique_ptr<TEvStreamWriteRequest> Request; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 4387a001f8..9f78798300 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -514,15 +514,15 @@ void TWriteSessionActor<UseMigrationProtocol>::SetupCounters() } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId) +void TWriteSessionActor<UseMigrationProtocol>::SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId) { if (SessionsCreated) { return; } //now topic is checked, can create group for real topic, not garbage - auto subGroup = NPersQueue::GetCountersForDataStream(Counters); - auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, folderId); + auto subGroup = NPersQueue::GetCountersForTopic(Counters, isServerless); + auto aggr = NPersQueue::GetLabelsForTopic(FullConverter, cloudId, dbId, dbPath, folderId); SessionsCreated = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_created_per_second"}, true, "name"); SessionsActive = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"api.topic_service.stream_write.sessions_active_count"}, false, "name"); @@ -582,6 +582,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse: if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { const auto& tabletConfig = description.GetPQTabletConfig(); SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(), + tabletConfig.GetYdbDatabasePath(), entry.DomainInfo->IsServerless(), tabletConfig.GetYcFolderId()); } else { SetupCounters(); diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index dbcf9257dd..9a2a361708 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -358,8 +358,9 @@ namespace NKikimr::NPersQueueTests { const std::set<std::string>& canonicalSensorNames, const TString& stream, const TString& consumer, const TString& host, const TString& shard) { - auto counters = GetCounters1stClass(monPort, "datastreams", cloudId, databaseId, - folderId, stream, consumer, host, shard); + auto counters = GetCounters1stClass(monPort, "datastreams", "%2FRoot", cloudId, + databaseId, folderId, stream, consumer, host, + shard); const auto sensors = counters["sensors"].GetArray(); std::set<std::string> sensorNames; std::transform(sensors.begin(), sensors.end(), |