diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-12-06 17:12:41 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-12-06 17:12:41 +0300 |
commit | 9a7839c691f4f5499275025c9ca2fbb82446799d (patch) | |
tree | 7b6ba07961017f3d1ab1e430648807cfd03f7a1b | |
parent | f95d13e616e0550ae9fe7feee7c2baa9357bc4a2 (diff) | |
download | ydb-9a7839c691f4f5499275025c9ca2fbb82446799d.tar.gz |
Introduce LabeledCounters to SysView
Introduce labeled_db counters
38 files changed, 1183 insertions, 56 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 1f654e259b8..9ae32f6cecd 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -764,7 +764,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) { userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); } - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "boostrapping " << Partition << " " << ctx.SelfID); + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID); if (NewPartition) { InitComplete(ctx); diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 168942c61d4..c9993792bdb 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -303,7 +303,7 @@ struct TUserInfo { if (AppData(ctx)->Counters) { if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { LabeledCounters.Reset(new TUserLabeledCounters( - user + "|$x|" + topicConverter->GetClientsideName(), partition, *dbPath)); + user + "||" + topicConverter->GetClientsideName(), partition, *dbPath)); if (DoInternalRead) { SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId); @@ -444,7 +444,7 @@ struct TUserInfo { void SetImportant(bool important) { Important = important; - if (LabeledCounters) { + if (LabeledCounters && !AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) { LabeledCounters->SetGroup(User + "/" + (important ? "1" : "0") + "/" + TopicConverter->GetClientsideName()); } } diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index dff94717e61..11fc2b18ef5 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -16,15 +16,15 @@ namespace NKikimr::NPQ { -void FillPQConfig(NActors::TTestActorRuntime& runtime, const TString& dbRoot, bool isFirstClass) { - runtime.GetAppData(0).PQConfig.SetEnabled(true); +void FillPQConfig(NKikimrPQ::TPQConfig& pqConfig, const TString& dbRoot, bool isFirstClass) { + pqConfig.SetEnabled(true); // NOTE(shmel1k@): KIKIMR-14221 - runtime.GetAppData(0).PQConfig.SetTopicsAreFirstClassCitizen(isFirstClass); - runtime.GetAppData(0).PQConfig.SetRequireCredentialsInNewProtocol(false); - runtime.GetAppData(0).PQConfig.SetRoot(dbRoot); - runtime.GetAppData(0).PQConfig.SetClusterTablePath(TStringBuilder() << dbRoot << "/Config/V2/Cluster"); - runtime.GetAppData(0).PQConfig.SetVersionTablePath(TStringBuilder() << dbRoot << "/Config/V2/Versions"); - runtime.GetAppData(0).PQConfig.MutableQuotingConfig()->SetEnableQuoting(false); + pqConfig.SetTopicsAreFirstClassCitizen(isFirstClass); + pqConfig.SetRequireCredentialsInNewProtocol(false); + pqConfig.SetRoot(dbRoot); + pqConfig.SetClusterTablePath(TStringBuilder() << dbRoot << "/Config/V2/Cluster"); + pqConfig.SetVersionTablePath(TStringBuilder() << dbRoot << "/Config/V2/Versions"); + pqConfig.MutableQuotingConfig()->SetEnableQuoting(false); } void PQTabletPrepare(const TTabletPreparationParameters& parameters, diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 64b604c0224..0a13f936ad2 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -22,7 +22,7 @@ inline constexpr static T PlainOrSoSlow(T plain, T slow) noexcept { constexpr ui32 NUM_WRITES = PlainOrSoSlow(100, 1); -void FillPQConfig(NActors::TTestActorRuntime& runtime, const TString& dbRoot, bool isFirstClass); +void FillPQConfig(NKikimrPQ::TPQConfig& pqConfig, const TString& dbRoot, bool isFirstClass); class TInitialEventsFilter : TNonCopyable { bool IsDone; @@ -127,6 +127,7 @@ struct TTestContext { Runtime->SetScheduledLimit(200); TAppPrepare appData; + appData.SetEnablePersistentQueryStats(enableDbCounters); appData.SetEnableDbCounters(enableDbCounters); SetupLogging(*Runtime); SetupTabletServices(*Runtime, &appData); @@ -135,7 +136,7 @@ struct TTestContext { CreateTestTabletInfo(TabletId, PQTabletType, TErasureType::ErasureNone), &CreatePersQueue); - FillPQConfig(*Runtime, "/Root/PQ", isFirstClass); + FillPQConfig(Runtime->GetAppData(0).PQConfig, "/Root/PQ", isFirstClass); TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index f3d91e1d67e..8c1bce2c57e 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -224,9 +224,18 @@ Y_UNIT_TEST(PartitionFirstClass) { }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { TFinalizer finalizer(tc); activeZone = false; + bool dbRegistered{false}; tc.Prepare(dispatchName, setup, activeZone, true, true, true); tc.Runtime->SetScheduledLimit(1000); + tc.Runtime->SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) { + auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database; + UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ"); + dbRegistered = true; + } + return TTestActorRuntime::DefaultObserverFunc(runtime, event); + }); PQTabletPrepare({}, {}, tc); @@ -244,15 +253,7 @@ Y_UNIT_TEST(PartitionFirstClass) { options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); tc.Runtime->DispatchEvents(options); } - - IActor* actorX = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PersQueue); - tc.Runtime->Register(actorX); - - TAutoPtr<IEventHandle> handle; - TEvTabletCounters::TEvTabletLabeledCountersResponse *result; - result = tc.Runtime->GrabEdgeEvent<TEvTabletCounters::TEvTabletLabeledCountersResponse>(handle); - UNIT_ASSERT(result); - UNIT_ASSERT_VALUES_EQUAL(result->Record.LabeledCountersByGroupSize(), 0); + UNIT_ASSERT(dbRegistered); }); } diff --git a/ydb/core/protos/sys_view.proto b/ydb/core/protos/sys_view.proto index a3ff84b707d..234646e1245 100644 --- a/ydb/core/protos/sys_view.proto +++ b/ydb/core/protos/sys_view.proto @@ -2,6 +2,7 @@ package NKikimrSysView; option java_package = "ru.yandex.kikimr.proto"; +import "ydb/core/protos/labeled_counters.proto"; import "ydb/core/protos/tablet.proto"; message TPartitionStatsKey { @@ -519,11 +520,16 @@ message TDbGRpcProxyCounters { optional TDbCounters RequestCounters = 1; } +message TDbLabeledCounters { + optional NKikimrLabeledCounters.TTabletLabeledCounters AggregatedPerTablets = 1; +} + message TDbServiceCounters { optional TDbCounters Main = 1; repeated TDbTabletCounters TabletCounters = 2; repeated TDbGRpcCounters GRpcCounters = 3; optional TDbGRpcProxyCounters GRpcProxyCounters = 4; + repeated TDbLabeledCounters LabeledCounters = 5; } enum EDbCountersService { @@ -531,8 +537,9 @@ enum EDbCountersService { TABLETS = 2; GRPC = 3; GRPC_PROXY = 4; - RESERVED_2 = 5; + LABELED = 5; RESERVED_3 = 6; + RESERVED_4 = 7; } // node -> sysview processor tablet @@ -552,6 +559,22 @@ message TEvSendDbCountersResponse { optional uint64 Generation = 2; // confirmed generation } +message TEvSendDbLabeledCountersRequest { + message TServiceCounters { + optional EDbCountersService Service = 1; + optional TDbServiceCounters Counters = 2; + } + repeated TServiceCounters ServiceCounters = 1; + optional uint64 NodeId = 2; + optional uint64 Generation = 3; +} + +message TEvSendDbLabeledCountersResponse { + optional string Database = 1; + optional uint64 Generation = 2; // confirmed generation +} + + // ---- Top partitions message TTopPartitionsKey { diff --git a/ydb/core/sys_view/common/db_counters.h b/ydb/core/sys_view/common/db_counters.h index 0a1850e78a4..fe6b9856e2c 100644 --- a/ydb/core/sys_view/common/db_counters.h +++ b/ydb/core/sys_view/common/db_counters.h @@ -14,12 +14,14 @@ class TDbServiceCounters { using TGRpcRequestDesc = std::pair<TString, TString>; THashMap<TGRpcRequestDesc, NKikimrSysView::TDbGRpcCounters*> ByGRpcRequest; + THashMap<TString, NKikimrSysView::TDbLabeledCounters*> ByGroupName; public: void Swap(TDbServiceCounters& other) { ProtoCounters.Swap(&other.ProtoCounters); ByTabletType.swap(other.ByTabletType); ByGRpcRequest.swap(other.ByGRpcRequest); + ByGroupName.swap(other.ByGroupName); } NKikimrSysView::TDbServiceCounters& Proto() { return ProtoCounters; } @@ -73,6 +75,35 @@ public: return counters; } + + NKikimrSysView::TDbLabeledCounters* FindLabeledCounters(const TString& groupName) const + { + if (auto it = ByGroupName.find(groupName); it != ByGroupName.end()) { + return it->second; + } + return {}; + } + + NKikimrSysView::TDbLabeledCounters* FindOrAddLabeledCounters(const TString& groupName) + { + if (auto it = ByGroupName.find(groupName); it != ByGroupName.end()) { + return it->second; + } + + auto* counters = ProtoCounters.AddLabeledCounters(); + auto lCounters = counters->MutableAggregatedPerTablets(); + lCounters->SetGroup(groupName); + lCounters->SetDelimiter("|"); + ByGroupName[groupName] = counters; + + return counters; + } + + void ClearLabeledCounters() + { + ProtoCounters.Clear(); + ByGroupName.clear(); + } }; } // NSysView diff --git a/ydb/core/sys_view/common/events.h b/ydb/core/sys_view/common/events.h index fb4347db616..b170395a54a 100644 --- a/ydb/core/sys_view/common/events.h +++ b/ydb/core/sys_view/common/events.h @@ -59,6 +59,8 @@ struct TEvSysView { EvRegisterDbCounters, EvSendDbCountersRequest, EvSendDbCountersResponse, + EvSendDbLabeledCountersRequest, + EvSendDbLabeledCountersResponse, EvWatchDatabase, EvUpdateTtlStats, @@ -330,6 +332,18 @@ struct TEvSysView { EvSendDbCountersResponse> {}; + struct TEvSendDbLabeledCountersRequest : public TEventPB< + TEvSendDbLabeledCountersRequest, + NKikimrSysView::TEvSendDbLabeledCountersRequest, + EvSendDbLabeledCountersRequest> + {}; + + struct TEvSendDbLabeledCountersResponse : public TEventPB< + TEvSendDbLabeledCountersResponse, + NKikimrSysView::TEvSendDbLabeledCountersResponse, + EvSendDbLabeledCountersResponse> + {}; + struct TEvWatchDatabase : public TEventLocal< TEvWatchDatabase, EvWatchDatabase> diff --git a/ydb/core/sys_view/processor/db_counters.cpp b/ydb/core/sys_view/processor/db_counters.cpp index e2f359e4d61..c941b875ad7 100644 --- a/ydb/core/sys_view/processor/db_counters.cpp +++ b/ydb/core/sys_view/processor/db_counters.cpp @@ -5,7 +5,8 @@ #include <ydb/core/grpc_services/counters/counters.h> #include <ydb/core/grpc_services/counters/proxy_counters.h> #include <ydb/core/kqp/counters/kqp_counters.h> -#include <ydb/core/tablet/tablet_counters_aggregator.h> +#include <ydb/core/tablet/labeled_db_counters.h> +#include <ydb/core/tablet/labeled_counters_merger.h> #include <ydb/core/tablet_flat/flat_executor_counters.h> namespace NKikimr { @@ -120,6 +121,10 @@ static void SwapMaxCounters(NKikimrSysView::TDbCounters* dst, NKikimrSysView::TD dst->SetCumulativeCount(src.GetCumulativeCount()); }; +static void SwapLabeledCounters(NKikimrLabeledCounters::TTabletLabeledCounters* dst, NKikimrLabeledCounters::TTabletLabeledCounters& src) { + dst->MutableLabeledCounter()->Swap(src.MutableLabeledCounter()); +}; + static void ResetSimpleCounters(NKikimrSysView::TDbCounters* dst) { auto simpleSize = dst->SimpleSize(); auto* to = dst->MutableSimple(); @@ -137,6 +142,27 @@ static void ResetMaxCounters(NKikimrSysView::TDbCounters* dst) { } } +static void ResetLabeledCounters(NKikimrLabeledCounters::TTabletLabeledCounters* dst) { + auto labeledSize = dst->LabeledCounterSize(); + auto* to = dst->MutableLabeledCounter(); + for (size_t i = 0; i < labeledSize; ++i) { + auto aggrFunc = (*to)[i].GetAggregateFunc(); + switch (aggrFunc) { + case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_MIN): + (*to)[i].SetValue(std::numeric_limits<ui64>::max()); + break; + case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_MAX): + (*to)[i].SetValue(0); + break; + case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_SUM): + (*to)[i].SetValue(0); + break; + default: + Y_FAIL("bad aggrFunc value"); + } + } +} + template <typename TAggrSum, typename TAggrMax> static void AggregateCounters(NKikimr::NSysView::TDbServiceCounters* dst, const NKikimrSysView::TDbServiceCounters& src) @@ -162,6 +188,24 @@ static void AggregateCounters(NKikimr::NSysView::TDbServiceCounters* dst, TAggrSum::Apply(dst->Proto().MutableGRpcProxyCounters()->MutableRequestCounters(), src.GetGRpcProxyCounters().GetRequestCounters()); } + + for (const auto& srcReq : src.GetLabeledCounters()) { + auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup()); + if (dstReq->GetAggregatedPerTablets().GetLabeledCounter().size() < + srcReq.GetAggregatedPerTablets().GetLabeledCounter().size()) { + const ui32 n = srcReq.GetAggregatedPerTablets().GetLabeledCounter().size() - + dstReq->GetAggregatedPerTablets().GetLabeledCounter().size(); + for (ui32 i = 0; i < n; ++i) { + dstReq->MutableAggregatedPerTablets()->AddLabeledCounter(); + } + } + + for (int i = 0; i < srcReq.GetAggregatedPerTablets().GetLabeledCounter().size(); ++i) { + const auto& srcCounter = srcReq.GetAggregatedPerTablets().GetLabeledCounter(i); + auto* trgCounter = dstReq->MutableAggregatedPerTablets()->MutableLabeledCounter(i); + NKikimr::TMerger::MergeOne(srcCounter, *trgCounter); + } + } } static void AggregateIncrementalCounters(NKikimr::NSysView::TDbServiceCounters* dst, @@ -195,6 +239,11 @@ static void SwapStatefulCounters(NKikimr::NSysView::TDbServiceCounters* dst, auto* dstReq = dst->FindOrAddGRpcCounters(srcReq.GetGRpcService(), srcReq.GetGRpcRequest()); SwapSimpleCounters(dstReq->MutableRequestCounters(), *srcReq.MutableRequestCounters()); } + + for (auto& srcReq : *src.MutableLabeledCounters()) { + auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup()); + SwapLabeledCounters(dstReq->MutableAggregatedPerTablets(), *srcReq.MutableAggregatedPerTablets()); + } } static void ResetStatefulCounters(NKikimrSysView::TDbServiceCounters* dst) { @@ -210,6 +259,9 @@ static void ResetStatefulCounters(NKikimrSysView::TDbServiceCounters* dst) { for (auto& dstReq : *dst->MutableGRpcCounters()) { ResetSimpleCounters(dstReq.MutableRequestCounters()); } + for (auto& dstReq : *dst->MutableLabeledCounters()) { + ResetLabeledCounters(dstReq.MutableAggregatedPerTablets()); + } } void TSysViewProcessor::SendNavigate() { @@ -261,6 +313,10 @@ TIntrusivePtr<IDbCounters> TSysViewProcessor::CreateCountersForService( result = NGRpcService::CreateGRpcProxyDbCounters(ExternalGroup, group); break; } + case NKikimrSysView::LABELED: { + result = NKikimr::CreateLabeledDbCounters(LabeledGroup); + break; + } default: break; } @@ -282,6 +338,13 @@ void TSysViewProcessor::AttachExternalCounters() { ->GetSubgroup("folder_id", FolderId) ->GetSubgroup("database_id", DatabaseId) ->RegisterSubgroup("host", "", ExternalGroup); + + GetServiceCounters(AppData()->Counters, "labeled", false) + ->GetSubgroup("database", Database) + ->GetSubgroup("cloud_id", CloudId) + ->GetSubgroup("folder_id", FolderId) + ->GetSubgroup("database_id", DatabaseId) + ->RegisterSubgroup("host", "", LabeledGroup); } void TSysViewProcessor::AttachInternalCounters() { @@ -303,6 +366,9 @@ void TSysViewProcessor::DetachExternalCounters() { GetServiceCounters(AppData()->Counters, "ydb_serverless", false) ->RemoveSubgroup("database", Database); + + GetServiceCounters(AppData()->Counters, "labeled", false) + ->RemoveSubgroup("database", Database); } void TSysViewProcessor::DetachInternalCounters() { @@ -374,6 +440,61 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev) { Send(ev->Sender, std::move(response)); } +void TSysViewProcessor::Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr& ev) { + if (!AppData()->FeatureFlags.GetEnableDbCounters()) { + return; + } + + auto& record = ev->Get()->Record; + auto nodeId = record.GetNodeId(); + + auto& state = NodeLabeledCountersStates[nodeId]; + state.FreshCount = 0; + + if (state.Generation == record.GetGeneration()) { + SVLOG_D("[" << TabletID() << "] TEvSendDbLabeledCountersRequest, known generation: " + << "node id# " << nodeId + << ", generation# " << record.GetGeneration()); + + auto response = MakeHolder<TEvSysView::TEvSendDbLabeledCountersResponse>(); + response->Record.SetDatabase(Database); + response->Record.SetGeneration(state.Generation); + Send(ev->Sender, std::move(response)); + return; + } + + state.Generation = record.GetGeneration(); + + std::unordered_set<NKikimrSysView::EDbCountersService> incomingServicesSet; + + for (auto& serviceCounters : *record.MutableServiceCounters()) { + const auto service = serviceCounters.GetService(); + incomingServicesSet.insert(service); + + auto& simpleState = state.Simple[service]; + simpleState.ClearLabeledCounters(); + SwapStatefulCounters(&simpleState, *serviceCounters.MutableCounters()); + } + + for (auto it = state.Simple.begin(); it != state.Simple.end(); ) { + if (incomingServicesSet.find(it->first) == incomingServicesSet.end()) { + it = state.Simple.erase(it); + } else { + ++it; + } + } + + SVLOG_D("[" << TabletID() << "] TEvSendDbLabeledCountersRequest: " + << "node id# " << nodeId + << ", generation# " << state.Generation + << ", request size# " << record.ByteSize()); + + auto response = MakeHolder<TEvSysView::TEvSendDbLabeledCountersResponse>(); + response->Record.SetDatabase(Database); + response->Record.SetGeneration(state.Generation); + Send(ev->Sender, std::move(response)); +} + void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) { for (auto& [_, counters] : AggregatedCountersState) { ResetStatefulCounters(&counters.Proto()); @@ -391,7 +512,6 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) { } ++it; } - for (auto& [service, aggrCounters] : AggregatedCountersState) { TIntrusivePtr<IDbCounters> counters; if (auto it = Counters.find(service); it != Counters.end()) { @@ -411,6 +531,43 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) { ScheduleApplyCounters(); } +void TSysViewProcessor::Handle(TEvPrivate::TEvApplyLabeledCounters::TPtr&) { + for (auto& [_, counters] : AggregatedLabeledState) { + ResetStatefulCounters(&counters.Proto()); + } + + for (auto it = NodeLabeledCountersStates.begin(); it != NodeLabeledCountersStates.end(); ) { + auto& state = it->second; + if (state.FreshCount > 1) { + it = NodeLabeledCountersStates.erase(it); + continue; + } + ++state.FreshCount; + for (const auto& [service, counters] : state.Simple) { + AggregateStatefulCounters(&AggregatedLabeledState[service], counters.Proto()); + } + ++it; + } + + for (auto& [service, aggrCounters] : AggregatedLabeledState) { + TIntrusivePtr<IDbCounters> counters; + if (auto it = Counters.find(service); it != Counters.end()) { + counters = it->second; + } else { + counters = CreateCountersForService(service); + } + if (!counters) { + continue; + } + counters->FromProto(aggrCounters); + } + + SVLOG_D("[" << TabletID() << "] TEvApplyLabeledCounters: " + << "services count# " << AggregatedLabeledState.size()); + + ScheduleApplyLabeledCounters(); +} + void TSysViewProcessor::Handle(TEvPrivate::TEvSendNavigate::TPtr&) { SendNavigate(); } diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp index f3c5437984b..1bd8bb5b055 100644 --- a/ydb/core/sys_view/processor/processor_impl.cpp +++ b/ydb/core/sys_view/processor/processor_impl.cpp @@ -14,6 +14,7 @@ TSysViewProcessor::TSysViewProcessor(const TActorId& tablet, TTabletStorageInfo* , TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 1 : 60)) , CollectInterval(TotalInterval / 2) , ExternalGroup(new ::NMonitoring::TDynamicCounters) + , LabeledGroup(new ::NMonitoring::TDynamicCounters) { InternalGroups["kqp_serverless"] = new ::NMonitoring::TDynamicCounters; InternalGroups["tablets_serverless"] = new ::NMonitoring::TDynamicCounters; @@ -214,6 +215,10 @@ void TSysViewProcessor::ScheduleApplyCounters() { Schedule(ProcessCountersInterval, new TEvPrivate::TEvApplyCounters); } +void TSysViewProcessor::ScheduleApplyLabeledCounters() { + Schedule(ProcessLabeledCountersInterval, new TEvPrivate::TEvApplyLabeledCounters); +} + void TSysViewProcessor::ScheduleSendNavigate() { Schedule(SendNavigateInterval, new TEvPrivate::TEvSendNavigate); } diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h index 52c7f6ad5c1..904f545ad8d 100644 --- a/ydb/core/sys_view/processor/processor_impl.h +++ b/ydb/core/sys_view/processor/processor_impl.h @@ -50,6 +50,7 @@ private: EvSendRequests, EvProcess, EvApplyCounters, + EvApplyLabeledCounters, EvSendNavigate, EvEnd }; @@ -64,6 +65,8 @@ private: struct TEvApplyCounters : public TEventLocal<TEvApplyCounters, EvApplyCounters> {}; + struct TEvApplyLabeledCounters : public TEventLocal<TEvApplyLabeledCounters, EvApplyLabeledCounters> {}; + struct TEvSendNavigate : public TEventLocal<TEvSendNavigate, EvSendNavigate> {}; }; @@ -119,7 +122,9 @@ private: void Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev); void Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev); + void Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr& ev); void Handle(TEvPrivate::TEvApplyCounters::TPtr& ev); + void Handle(TEvPrivate::TEvApplyLabeledCounters::TPtr& ev); void Handle(TEvPrivate::TEvSendNavigate::TPtr& ev); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev); @@ -148,6 +153,7 @@ private: void ScheduleCollect(); void ScheduleSendRequests(); void ScheduleApplyCounters(); + void ScheduleApplyLabeledCounters(); void ScheduleSendNavigate(); template <typename TSchema, typename TMap> @@ -231,7 +237,9 @@ private: hFunc(TEvSysView::TEvSendTopPartitions, Handle); hFunc(TEvSysView::TEvGetTopPartitionsRequest, Handle); hFunc(TEvSysView::TEvSendDbCountersRequest, Handle); + hFunc(TEvSysView::TEvSendDbLabeledCountersRequest, Handle); hFunc(TEvPrivate::TEvApplyCounters, Handle); + hFunc(TEvPrivate::TEvApplyLabeledCounters, Handle); hFunc(TEvPrivate::TEvSendNavigate, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); @@ -265,6 +273,8 @@ private: static constexpr size_t BatchSizeLimit = 4 << 20; // interval of db counters processing static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5); + // interval of db labeled counters processing + static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1); // interval of sending next navigate request static constexpr TDuration SendNavigateInterval = TDuration::Seconds(5); @@ -357,6 +367,7 @@ private: TString DatabaseId; ::NMonitoring::TDynamicCounterPtr ExternalGroup; + ::NMonitoring::TDynamicCounterPtr LabeledGroup; std::unordered_map<TString, ::NMonitoring::TDynamicCounterPtr> InternalGroups; using TDbCountersServiceMap = std::unordered_map<NKikimrSysView::EDbCountersService, @@ -368,7 +379,9 @@ private: size_t FreshCount = 0; }; std::unordered_map<TNodeId, TNodeCountersState> NodeCountersStates; + std::unordered_map<TNodeId, TNodeCountersState> NodeLabeledCountersStates; TDbCountersServiceMap AggregatedCountersState; + TDbCountersServiceMap AggregatedLabeledState; std::unordered_map<NKikimrSysView::EDbCountersService, TIntrusivePtr<IDbCounters>> Counters; }; diff --git a/ydb/core/sys_view/processor/tx_init.cpp b/ydb/core/sys_view/processor/tx_init.cpp index 71a5a135819..11ddac07553 100644 --- a/ydb/core/sys_view/processor/tx_init.cpp +++ b/ydb/core/sys_view/processor/tx_init.cpp @@ -461,6 +461,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase { if (AppData()->FeatureFlags.GetEnableDbCounters()) { Self->ScheduleApplyCounters(); + Self->ScheduleApplyLabeledCounters(); Self->SendNavigate(); } diff --git a/ydb/core/sys_view/service/sysview_service.cpp b/ydb/core/sys_view/service/sysview_service.cpp index 92e826cd7fa..8514ff61825 100644 --- a/ydb/core/sys_view/service/sysview_service.cpp +++ b/ydb/core/sys_view/service/sysview_service.cpp @@ -8,6 +8,7 @@ #include <ydb/core/base/path.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/tablet/tablet_counters_aggregator.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -354,10 +355,19 @@ public: } if (AppData()->FeatureFlags.GetEnableDbCounters()) { - auto intervalSize = ProcessCountersInterval.MicroSeconds(); - auto deadline = (Now().MicroSeconds() / intervalSize + 1) * intervalSize; - deadline += RandomNumber<ui64>(intervalSize / 5); - Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessCounters()); + { + auto intervalSize = ProcessCountersInterval.MicroSeconds(); + auto deadline = (TInstant::Now().MicroSeconds() / intervalSize + 1) * intervalSize; + deadline += RandomNumber<ui64>(intervalSize / 5); + Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessCounters()); + } + + { + auto intervalSize = ProcessLabeledCountersInterval.MicroSeconds(); + auto deadline = (TInstant::Now().MicroSeconds() / intervalSize + 1) * intervalSize; + deadline += RandomNumber<ui64>(intervalSize / 5); + Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessLabeledCounters()); + } auto callback = MakeIntrusive<TServiceDbWatcherCallback>(ctx.ActorSystem()); DbWatcherActorId = ctx.Register(CreateDbWatcherActor(callback)); @@ -379,9 +389,11 @@ public: hFunc(TEvPrivate::TEvProcessInterval, Handle); hFunc(TEvPrivate::TEvSendSummary, Handle); hFunc(TEvPrivate::TEvProcessCounters, Handle); + hFunc(TEvPrivate::TEvProcessLabeledCounters, Handle); hFunc(TEvPrivate::TEvRemoveDatabase, Handle); hFunc(TEvSysView::TEvRegisterDbCounters, Handle); hFunc(TEvSysView::TEvSendDbCountersResponse, Handle); + hFunc(TEvSysView::TEvSendDbLabeledCountersResponse, Handle); hFunc(TEvSysView::TEvGetIntervalMetricsRequest, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); @@ -398,6 +410,7 @@ private: EvProcessInterval = EventSpaceBegin(TEvents::ES_PRIVATE), EvSendSummary, EvProcessCounters, + EvProcessLabeledCounters, EvRemoveDatabase, EvEnd }; @@ -423,6 +436,9 @@ private: struct TEvProcessCounters : public TEventLocal<TEvProcessCounters, EvProcessCounters> { }; + struct TEvProcessLabeledCounters : public TEventLocal<TEvProcessLabeledCounters, EvProcessLabeledCounters> { + }; + struct TEvRemoveDatabase : public TEventLocal<TEvRemoveDatabase, EvRemoveDatabase> { TString Database; TPathId PathId; @@ -565,15 +581,18 @@ private: Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); } + template <typename T> + requires std::is_same_v<T, TEvSysView::TEvSendDbCountersRequest> || + std::is_same_v<T, TEvSysView::TEvSendDbLabeledCountersRequest> void SendCounters(const TString& database) { auto processorId = GetProcessorId(database); if (!processorId) { return; } - auto& dbCounters = DatabaseCounters[database]; - - auto sendEv = MakeHolder<TEvSysView::TEvSendDbCountersRequest>(); + constexpr bool isLabeled = std::is_same<T, TEvSysView::TEvSendDbLabeledCountersRequest>::value; + auto& dbCounters = isLabeled ? DatabaseLabeledCounters[database] : DatabaseCounters[database]; + auto sendEv = MakeHolder<T>(); auto& record = sendEv->Record; if (dbCounters.IsConfirmed) { @@ -595,7 +614,11 @@ private: serviceCounters->SetService(service); auto* diff = serviceCounters->MutableCounters(); - CalculateCountersDiff(diff, state.Current, state.Confirmed); + if (isLabeled) { + diff->CopyFrom(state.Current.Proto()); + } else { + CalculateCountersDiff(diff, state.Current, state.Confirmed); + } } SVLOG_D("Send counters: " @@ -604,7 +627,8 @@ private: << ", database# " << database << ", generation# " << record.GetGeneration() << ", node id# " << record.GetNodeId() - << ", is retrying# " << dbCounters.IsRetrying); + << ", is retrying# " << dbCounters.IsRetrying + << ", is labeled# " << isLabeled); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(sendEv.Release(), processorId, true), @@ -642,6 +666,23 @@ private: it->second.States[service].Counters = counters; } + void RegisterDbLabeledCounters(const TString& database, NKikimrSysView::EDbCountersService service, + TIntrusivePtr<IDbCounters> counters) + { + auto [it, inserted] = DatabaseLabeledCounters.try_emplace(database, TDbCounters()); + if (inserted) { + if (ProcessorIds.find(database) == ProcessorIds.end()) { + RequestProcessorId(database); + } + + if (DbWatcherActorId) { + auto evWatch = MakeHolder<NSysView::TEvSysView::TEvWatchDatabase>(database); + Send(DbWatcherActorId, evWatch.Release()); + } + } + it->second.States[service].Counters = counters; + } + void Handle(TEvPrivate::TEvSendSummary::TPtr& ev) { auto prevIntervalEnd = IntervalEnd - TotalInterval; auto intervalEnd = ev->Get()->IntervalEnd; @@ -726,12 +767,23 @@ private: << "service id# " << SelfId()); for (auto& [database, dbCounters] : DatabaseCounters) { - SendCounters(database); + SendCounters<TEvSysView::TEvSendDbCountersRequest>(database); } Schedule(ProcessCountersInterval, new TEvPrivate::TEvProcessCounters()); } + void Handle(TEvPrivate::TEvProcessLabeledCounters::TPtr&) { + SVLOG_D("Handle TEvPrivate::TEvProcessLabeledCounters: " + << "service id# " << SelfId()); + + for (auto& [database, dbCounters] : DatabaseLabeledCounters) { + SendCounters<TEvSysView::TEvSendDbLabeledCountersRequest>(database); + } + + Schedule(ProcessLabeledCountersInterval, new TEvPrivate::TEvProcessLabeledCounters()); + } + void Handle(TEvPrivate::TEvRemoveDatabase::TPtr& ev) { auto database = ev->Get()->Database; auto pathId = ev->Get()->PathId; @@ -749,6 +801,7 @@ private: ProcessorIds.erase(database); Attempts.erase(database); DatabaseCounters.erase(database); + DatabaseLabeledCounters.erase(database); UnresolvedTabletCounters.erase(pathId); } @@ -781,7 +834,7 @@ private: } if (dbCounters.IsRetrying) { - SendCounters(database); + SendCounters<TEvSysView::TEvSendDbCountersRequest>(database); } SVLOG_D("Handle TEvSysView::TEvSendDbCountersResponse: " @@ -790,6 +843,36 @@ private: << ", generation# " << generation); } + void Handle(TEvSysView::TEvSendDbLabeledCountersResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + const auto& database = record.GetDatabase(); + const auto generation = record.GetGeneration(); + + auto it = DatabaseLabeledCounters.find(database); + if (it == DatabaseLabeledCounters.end()) { + return; + } + + auto& dbCounters = it->second; + if (generation != dbCounters.Generation) { + return; + } + + dbCounters.IsConfirmed = true; + for (auto& [_, state] : dbCounters.States) { + state.Confirmed.Swap(state.Current); + } + + if (dbCounters.IsRetrying) { + SendCounters<TEvSysView::TEvSendDbLabeledCountersRequest>(database); + } + + SVLOG_D("Handle TEvSysView::TEvSendDbLabeledCountersResponse: " + << "service id# " << SelfId() + << ", database# " << database + << ", generation# " << generation); + } + void Handle(TEvSysView::TEvRegisterDbCounters::TPtr& ev) { const auto service = ev->Get()->Service; @@ -803,6 +886,15 @@ private: << ", path id# " << pathId << ", service# " << (int)service); + } else if (service == NKikimrSysView::LABELED) { + const auto& database = ev->Get()->Database; + RegisterDbLabeledCounters(database, service, ev->Get()->Counters); + + SVLOG_D("Handle TEvSysView::TEvRegisterDbLabeledCounters: " + << "service id# " << SelfId() + << ", database# " << database + << ", service# " << (int)service); + } else { // register by database name const auto& database = ev->Get()->Database; RegisterDbCounters(database, service, ev->Get()->Counters); @@ -1105,6 +1197,7 @@ private: }; std::unordered_map<TString, TDbCounters> DatabaseCounters; + std::unordered_map<TString, TDbCounters> DatabaseLabeledCounters; THashMap<TPathId, TIntrusivePtr<IDbCounters>> UnresolvedTabletCounters; TActorId DbWatcherActorId; @@ -1115,6 +1208,7 @@ private: static constexpr size_t SummaryRetryAttempts = 5; static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5); + static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1); }; THolder<IActor> CreateSysViewService( diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 898e4cb8c28..2579cf026d4 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -1,4 +1,5 @@ #include "ut_common.h" +#include <ydb/core/persqueue/ut/common/pq_ut_common.h> namespace NKikimr { namespace NSysView { @@ -25,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString return subdomain; } -TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool enableSVP) { +TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) { auto mbusPort = PortManager.GetPort(); auto grpcPort = PortManager.GetPort(); @@ -64,6 +65,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Client->InitRootScheme("Root"); + if (pqTabletsN) { + NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true); + PqTabletIds = Server->StartPQTablets(pqTabletsN); + } + Endpoint = "localhost:" + ToString(grpcPort); DriverConfig = NYdb::TDriverConfig().SetEndpoint(Endpoint); Driver = MakeHolder<NYdb::TDriver>(DriverConfig); diff --git a/ydb/core/sys_view/ut_common.h b/ydb/core/sys_view/ut_common.h index e142b969095..0c25bfab589 100644 --- a/ydb/core/sys_view/ut_common.h +++ b/ydb/core/sys_view/ut_common.h @@ -19,7 +19,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( class TTestEnv { public: TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0, - bool enableSVP = false); + ui32 pqTabletsN = 0, bool enableSVP = false); ~TTestEnv(); Tests::TServer& GetServer() const { @@ -42,6 +42,14 @@ public: return Endpoint; } + const Tests::TServerSettings::TPtr GetSettings() const { + return Settings; + } + + const TVector<ui64>& GetPqTabletIds() const { + return PqTabletIds; + } + TStoragePools GetPools() const; TStoragePools CreatePoolsForTenant(const TString& tenant); @@ -57,6 +65,7 @@ private: TString Endpoint; NYdb::TDriverConfig DriverConfig; THolder<NYdb::TDriver> Driver; + TVector<ui64> PqTabletIds; }; } // NSysView diff --git a/ydb/core/sys_view/ut_counters.cpp b/ydb/core/sys_view/ut_counters.cpp index 51bc0c49650..66623084656 100644 --- a/ydb/core/sys_view/ut_counters.cpp +++ b/ydb/core/sys_view/ut_counters.cpp @@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) { Y_UNIT_TEST_SUITE(DbCounters) { Y_UNIT_TEST(TabletsSimple) { - TTestEnv env(1, 2, 0, true); + TTestEnv env(1, 2, 0, 0, true); CreateDatabasesAndTables(env); for (size_t iter = 0; iter < 30; ++iter) { diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 898272e5caa..502b9174e41 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1174,7 +1174,7 @@ Y_UNIT_TEST_SUITE(SystemView) { auto nowUs = TInstant::Now().MicroSeconds(); - TTestEnv env(1, 4, 0, true); + TTestEnv env(1, 4, 0, 0, true); CreateTenantsAndTables(env); TTableClient client(env.GetDriver()); @@ -1226,7 +1226,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, true); + TTestEnv env(1, 4, 0, 0, true); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); @@ -1256,7 +1256,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, true); + TTestEnv env(1, 4, 0, 0, true); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt index 7331e727891..d2ef408e870 100644 --- a/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt +++ b/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC cpp-testing-unittest cpp-yson-node kqp-ut-common + persqueue-ut-common core-testlib-default cpp-client-draft ) @@ -38,6 +39,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp ) add_test( NAME diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt index e61f991e75c..d3d8170c6a8 100644 --- a/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC cpp-testing-unittest cpp-yson-node kqp-ut-common + persqueue-ut-common core-testlib-default cpp-client-draft ) @@ -40,6 +41,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp ) add_test( NAME diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt index 4e7d32c305e..0b13142c672 100644 --- a/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt +++ b/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC cpp-testing-unittest cpp-yson-node kqp-ut-common + persqueue-ut-common core-testlib-default cpp-client-draft ) @@ -42,6 +43,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp ) add_test( NAME diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp new file mode 100644 index 00000000000..2ef24fdd2d0 --- /dev/null +++ b/ydb/core/sys_view/ut_labeled.cpp @@ -0,0 +1,331 @@ +#include "ut_common.h" + +#include "ut_common.h" + +#include <ydb/core/base/counters.h> +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/persqueue/ut/common/pq_ut_common.h> + +namespace NKikimr { +namespace NSysView { + +using namespace NYdb; +using namespace NYdb::NTable; +using namespace NYdb::NScheme; + +const ui32 partitionsN = 32; +const TString topicName = "topic"; + + +namespace { + +void CreateDatabase(TTestEnv& env, const TString& databaseName) { + auto subdomain = GetSubDomainDeclareSettings(databaseName); + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + env.GetClient().CreateExtSubdomain("/Root", subdomain)); + + env.GetTenants().Run("/Root/" + databaseName, 1); + + auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); + subdomainSettings.SetExternalSysViewProcessor(true); + subdomainSettings.SetExternalSchemeShard(true); + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); +} + +bool CheckCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue, + bool isDerivative) { + auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val(); + return (value == refValue); +} + +bool CheckLtCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue, + bool isDerivative) { + auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val(); + return (value <= refValue); +} + +bool CheckLabeledCounters(::NMonitoring::TDynamicCounterPtr databaseGroup, const TString& dbId, + std::function<bool(::NMonitoring::TDynamicCounterPtr)> particularCountersCheck) { + bool isGood{true}; + Y_UNUSED(dbId); + auto topicGroup = databaseGroup + ->GetSubgroup("cloud_id", "") + ->GetSubgroup("folder_id", "") + ->GetSubgroup("database_id", "") + ->GetSubgroup("host", "") + ->GetSubgroup("topic", topicName); + { + { + TStringStream ss; + topicGroup->OutputHtml(ss); + Cerr << ss.Str() << Endl; + } + + isGood &= particularCountersCheck(topicGroup); + } + + return isGood; +} + +void GetCounters(TTestEnv& env, const TString& databaseName, const TString& databasePath, + std::function<bool(::NMonitoring::TDynamicCounterPtr)> particularCountersCheck) { + for (size_t iter = 0; iter < 30; ++iter) { + Cerr << "iteration " << iter << Endl; + + bool checkDb = false; + + for (ui32 nodeId = 0; nodeId < env.GetServer().GetRuntime()->GetNodeCount(); ++nodeId) { + auto counters = env.GetServer().GetRuntime()->GetAppData(nodeId).Counters; + auto labeledGroup = GetServiceCounters(counters, "labeled", false); + Y_VERIFY(labeledGroup); + + auto databaseGroup = labeledGroup->FindSubgroup("database", databasePath); + if (databaseGroup) { + checkDb = CheckLabeledCounters(databaseGroup, databaseName, particularCountersCheck); + } + } + + if (checkDb) { + return; + } + + Sleep(TDuration::Seconds(5)); + } + UNIT_ASSERT_C(false, "out of iterations"); +} + +} // namespace + +Y_UNIT_TEST_SUITE(LabeledDbCounters) { + + Y_UNIT_TEST(OneTablet) { + TTestEnv env(1, 2, 0, 1, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + + CreateDatabase(env, databaseName); + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + env.GetPqTabletIds()[0], edge); + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + Y_UNIT_TEST(OneTabletRestart) { + TTestEnv env(1, 2, 0, 1, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); + + CreateDatabase(env, databaseName); + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + env.GetPqTabletIds()[0], edge); + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + { + TStringStream ss; + topicGroup->OutputHtml(ss); + Cerr << ss.Str() << Endl; + } + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + env.GetServer().GetRuntime()->Register(CreateTabletKiller(env.GetPqTabletIds()[0])); + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckLtCounter(topicGroup, "topic.max_partition_uptime_milliseconds", + TDuration::Minutes(1).MilliSeconds() + 200, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false); + return isGood; + }; + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + } + + Y_UNIT_TEST(TwoTablets) { + TTestEnv env(1, 2, 0, 2, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + + CreateDatabase(env, databaseName); + for (auto& tbId : env.GetPqTabletIds()) { + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + tbId, env.GetServer().GetRuntime()->AllocateEdgeActor()); + } + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + Y_UNIT_TEST(TwoTabletsKillOneTablet) { + TTestEnv env(1, 2, 0, 2, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); + CreateDatabase(env, databaseName); + for (auto& tbId : env.GetPqTabletIds()) { + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + tbId, edge); + } + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + for (ui32 i = 0; i < env.GetServer().StaticNodes() + env.GetServer().DynamicNodes(); i++) { + env.GetClient().MarkNodeInHive(env.GetServer().GetRuntime(), i, false); + } + env.GetServer().GetRuntime()->Register(CreateTabletKiller(env.GetPqTabletIds()[0])); + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false); + + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + } + + Y_UNIT_TEST(TwoTabletsDisconnectOneNode) { + TTestEnv env(1, 2, 0, 2, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); + CreateDatabase(env, databaseName); + for (auto& tbId : env.GetPqTabletIds()) { + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + tbId, edge); + } + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + env.GetServer().GetRuntime()->DisconnectNodes(0, 1, false); + env.GetServer().GetRuntime()->DisconnectNodes(0, 2, false); + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false); + + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + } + + Y_UNIT_TEST(TwoTabletsDisconnectOneNodeHardWay) { + TTestEnv env(1, 2, 0, 2, true); + const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; + const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; + auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); + CreateDatabase(env, databaseName); + for (auto& tbId : env.GetPqTabletIds()) { + NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(), + tbId, edge); + } + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false); + isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false); + + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + + for (ui32 i = 0; i < env.GetServer().StaticNodes() + env.GetServer().DynamicNodes(); i++) { + env.GetClient().MarkNodeInHive(env.GetServer().GetRuntime(), i, false); + } + + env.GetServer().GetRuntime()->DisconnectNodes(0, 1, true); + env.GetServer().GetRuntime()->DisconnectNodes(0, 2, true); + + { + auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { + bool isGood{true}; + + isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false); + isGood &= CheckCounter(topicGroup, "topic.partitions.total_count", partitionsN, false); + return isGood; + }; + + Sleep(TDuration::Minutes(1)); + GetCounters(env, databaseName, databasePath, check); + } + } +} + +} // NSysView +} // NKikimr diff --git a/ydb/core/tablet/CMakeLists.txt b/ydb/core/tablet/CMakeLists.txt index 80834d9cfc9..ec3f900097d 100644 --- a/ydb/core/tablet/CMakeLists.txt +++ b/ydb/core/tablet/CMakeLists.txt @@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tablet PUBLIC target_sources(ydb-core-tablet PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet/bootstrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/labeled_counters_merger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet/labeled_db_counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/node_tablet_monitor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/node_whiteboard.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/pipe_tracker.cpp @@ -58,4 +59,5 @@ target_sources(ydb-core-tablet PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet/tablet_sys.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/tablet_tracing_signals.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet/private/aggregated_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet/private/labeled_db_counters.cpp ) diff --git a/ydb/core/tablet/labeled_counters_merger.h b/ydb/core/tablet/labeled_counters_merger.h index c3c1a9653a1..097696b9d27 100644 --- a/ydb/core/tablet/labeled_counters_merger.h +++ b/ydb/core/tablet/labeled_counters_merger.h @@ -46,6 +46,7 @@ public: target.SetValue(value); target.SetType(type); target.SetAggregateFunc(func); + target.SetNameId(source.GetNameId()); } } diff --git a/ydb/core/tablet/labeled_db_counters.cpp b/ydb/core/tablet/labeled_db_counters.cpp new file mode 100644 index 00000000000..1b4e1b86163 --- /dev/null +++ b/ydb/core/tablet/labeled_db_counters.cpp @@ -0,0 +1,12 @@ +#include "labeled_db_counters.h" +#include "private/labeled_db_counters.h" + + +namespace NKikimr { + +TIntrusivePtr<NSysView::IDbCounters> CreateLabeledDbCounters( + ::NMonitoring::TDynamicCounterPtr externalGroup) { + return MakeIntrusive<NPrivate::TDbLabeledCounters>(externalGroup); +} + +} // namespace NKikimr diff --git a/ydb/core/tablet/labeled_db_counters.h b/ydb/core/tablet/labeled_db_counters.h new file mode 100644 index 00000000000..123f1b9ad75 --- /dev/null +++ b/ydb/core/tablet/labeled_db_counters.h @@ -0,0 +1,20 @@ +#pragma once + +#include <util/generic/ptr.h> +#include "tablet_counters.h" + + +namespace NKikimr { + +class ILabeledCounters : public virtual TThrRefBase { +public: + using TPtr = TIntrusivePtr<ILabeledCounters>; + + virtual void Apply(ui64 TabletID, const NKikimr::TTabletLabeledCountersBase* labeledCounters) = 0; + virtual void ForgetTablet(ui64 tabletID) = 0; + virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } +}; + +TIntrusivePtr<NSysView::IDbCounters> CreateLabeledDbCounters(::NMonitoring::TDynamicCounterPtr externalGroup); + +} // namespace NKikimr diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp index 2dccbf9e323..66ed7517708 100644 --- a/ydb/core/tablet/private/aggregated_counters.cpp +++ b/ydb/core/tablet/private/aggregated_counters.cpp @@ -574,6 +574,43 @@ void TAggregatedLabeledCounters::FillGetRequestV2( } } +void TAggregatedLabeledCounters::ToProto(NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const { + if (Changed) { + for (ui32 idx : xrange(CountersByTabletId.size())) { + Recalc(idx); + } + Changed = false; + } + ui32 updatedCount{0}; + for (ui32 i = 0; i < Size(); ++i) { + if (strlen(Names[i]) != 0) { + if (labeledCounters.LabeledCounterSize() <= updatedCount) { + labeledCounters.AddLabeledCounter(); + } + auto& labeledCounter = *labeledCounters.MutableLabeledCounter(updatedCount); + labeledCounter.SetValue(GetValue(i)); + labeledCounter.SetNameId(i); + labeledCounter.SetAggregateFunc(NKikimr::TLabeledCounterOptions::EAggregateFunc(AggrFunc[i])); + labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i])); + ++updatedCount; + } + } +} + +void TAggregatedLabeledCounters::FromProto( + NMonitoring::TDynamicCounterPtr group, + const NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const { + for (const auto& counter : labeledCounters.GetLabeledCounter()) { + const ui32 nameId{counter.GetNameId()}; + if (strlen(Names[nameId]) != 0) { + // TODO: ASDFGS if CT_TIMELAG -> ctx.Now() - counters.GetValue + const bool derived = counter.GetType() == TLabeledCounterOptions::CT_DERIV; + auto namedCounter = group->GetNamedCounter("name", Names[nameId], derived); + *namedCounter = counter.GetValue(); + } + } +} + void TAggregatedLabeledCounters::Recalc(ui32 idx) const { Y_VERIFY(idx < Ids.size()); auto &counters = CountersByTabletId[idx]; @@ -581,7 +618,11 @@ void TAggregatedLabeledCounters::Recalc(ui32 idx) const { std::pair<ui64, ui64> aggrVal{0,0}; ui64 cntCount = counters.size(); - Y_VERIFY(cntCount > 0); + // Y_VERIFY(cntCount > 0); + if (cntCount == 0) { + return; + } + if (aggrFunc == TTabletLabeledCountersBase::EAggregateFunc::EAF_MIN) aggrVal = counters.begin()->second; diff --git a/ydb/core/tablet/private/aggregated_counters.h b/ydb/core/tablet/private/aggregated_counters.h index 24e5ab04d36..c099942d8e7 100644 --- a/ydb/core/tablet/private/aggregated_counters.h +++ b/ydb/core/tablet/private/aggregated_counters.h @@ -156,6 +156,10 @@ public: void FillGetRequestV2(NKikimr::TTabletLabeledCountersResponseContext* context, const TString& group) const; + void ToProto(NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const; + void FromProto(NMonitoring::TDynamicCounterPtr group, + const NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const; + private: // ::NMonitoring::TDynamicCounterPtr CounterGroup; diff --git a/ydb/core/tablet/private/labeled_db_counters.cpp b/ydb/core/tablet/private/labeled_db_counters.cpp new file mode 100644 index 00000000000..ddf01f56824 --- /dev/null +++ b/ydb/core/tablet/private/labeled_db_counters.cpp @@ -0,0 +1,116 @@ +#include "labeled_db_counters.h" + +#include <library/cpp/actors/core/actorsystem.h> +#include <util/string/split.h> +#include <ydb/core/sys_view/service/sysview_service.h> + +namespace NKikimr::NPrivate { + +/* +** class TPQCounters + */ + +THashMap<TString, TAutoPtr<TAggregatedLabeledCounters>> TPQCounters::LabeledCountersByGroupReference = {}; + +TPQCounters::TPQCounters(NMonitoring::TDynamicCounterPtr counters) { + Group = counters; +} + +void TPQCounters::Apply(ui64 tabletId, const NKikimr::TTabletLabeledCountersBase* labeledCounters) { + const TString group = labeledCounters->GetGroup(); + TString groupNames; + + if (!LabeledCountersByGroup.Has(group)) { + TVector<TString> rr; + StringSplitter(group).Split('|').Collect(&rr); + for (ui32 i = 0; i < rr.size(); ++i) { + if (i > 0) + groupNames += '|'; + groupNames += labeledCounters->GetGroupName(i); + } + + if (!LabeledCountersByGroupReference.contains(groupNames)) { + LabeledCountersByGroupReference.emplace(groupNames, new TAggregatedLabeledCounters( + labeledCounters->GetCounters().Size(), labeledCounters->GetAggrFuncs(), + labeledCounters->GetNames(), labeledCounters->GetTypes(), groupNames)); + } + } + + auto& el = LabeledCountersByGroup.InsertIfAbsent(group, new TAggregatedLabeledCounters( + labeledCounters->GetCounters().Size(), labeledCounters->GetAggrFuncs(), + labeledCounters->GetNames(), labeledCounters->GetTypes(), groupNames)); + + for (ui32 i = 0, N = labeledCounters->GetCounters().Size(); i < N; ++i) { + const ui64& value = labeledCounters->GetCounters()[i].Get(); + // FIXME (?): + // const ui64& id = labeledCounters->GetIds()[i].Get(); + const ui64 id = i; + el->SetValue(tabletId, i, value, id); + } +} + +void TPQCounters::ForgetTablet(ui64 tabletId) { + for (auto& bucket : LabeledCountersByGroup.Buckets) { + TWriteGuard guard(bucket.GetLock()); + auto& map = bucket.GetMap(); + for (auto iterator = map.begin(); iterator != map.end();) { + bool empty = iterator->second->ForgetTablet(tabletId); + if (empty) { + auto eraseIterator = iterator; + ++iterator; + map.erase(eraseIterator); + } else { + ++iterator; + } + } + } +} + +/* +** class TDbLabeledCounters + */ + +TDbLabeledCounters::TDbLabeledCounters() +: TPQCounters(MakeIntrusive<::NMonitoring::TDynamicCounters>()) +{} + +TDbLabeledCounters::TDbLabeledCounters(::NMonitoring::TDynamicCounterPtr counters) +: TPQCounters(counters) +{} + +void TDbLabeledCounters::ToProto(NKikimr::NSysView::TDbServiceCounters& counters) { + counters.ClearLabeledCounters(); + for (auto& bucket : LabeledCountersByGroup.Buckets) { + TWriteGuard guard(bucket.GetLock()); + for (auto& [group, labeledCounters] : bucket.GetMap()) { + auto* proto = counters.FindOrAddLabeledCounters(group); + auto* labeled = proto->MutableAggregatedPerTablets(); + labeledCounters->ToProto(*labeled); + } + } +} + +void TDbLabeledCounters::FromProto(NKikimr::NSysView::TDbServiceCounters& counters) { + for (auto& proto : *counters.Proto().MutableLabeledCounters()) { + TVector<TString> groups; + TVector<TString> groupNames = {"topic", "important", "consumer"}; + Y_VERIFY(proto.GetAggregatedPerTablets().GetDelimiter() == "|"); + StringSplitter(proto.GetAggregatedPerTablets().GetGroup()).Split('|').Collect(&groups); + auto countersGroup = Group; + // FIXME: a little hack here: we have consumer - important - topic group names in proto + // that's why we iterate the group in reverse order + // this comes from: ydb/core/persqueue/user_info.h:310 (TUserInfo::TUserInfo) + std::reverse(groups.begin(), groups.end()); + for (size_t i = 0; i < groups.size(); ++i) { + if (i != 1) { + countersGroup = countersGroup->GetSubgroup(groupNames[i], groups[i]); + } + } + const TString groupNamesStr = (groups.size() == 3) ? "client|important|topic" : "topic"; + + LabeledCountersByGroupReference[groupNamesStr]->FromProto(countersGroup, + proto.GetAggregatedPerTablets()); + } +} + +} // namespace NKikimr::NPrivate diff --git a/ydb/core/tablet/private/labeled_db_counters.h b/ydb/core/tablet/private/labeled_db_counters.h new file mode 100644 index 00000000000..13808848b0b --- /dev/null +++ b/ydb/core/tablet/private/labeled_db_counters.h @@ -0,0 +1,39 @@ +#pragma once + +#include <ydb/core/sys_view/service/db_counters.h> +#include <ydb/core/tablet/labeled_db_counters.h> +#include <ydb/core/util/concurrent_rw_hash.h> + +#include "aggregated_counters.h" + + +namespace NKikimr::NPrivate { + +class TPQCounters : public ILabeledCounters { +protected: + TConcurrentRWHashMap<TString, TAutoPtr<TAggregatedLabeledCounters>, 256> LabeledCountersByGroup; + NMonitoring::TDynamicCounterPtr Group; + +public: + using TPtr = TIntrusivePtr<TPQCounters>; + + explicit TPQCounters(NMonitoring::TDynamicCounterPtr counters); + + void Apply(ui64 tabletID, const NKikimr::TTabletLabeledCountersBase* labeledCounters) override; + void ForgetTablet(ui64 tabletID) override; + + static THashMap<TString, TAutoPtr<TAggregatedLabeledCounters>> LabeledCountersByGroupReference; +}; + +class TDbLabeledCounters : public TPQCounters, public NSysView::IDbCounters { +public: + using TPtr = TIntrusivePtr<TDbLabeledCounters>; + + TDbLabeledCounters(); + explicit TDbLabeledCounters(::NMonitoring::TDynamicCounterPtr counters); + + void ToProto(NKikimr::NSysView::TDbServiceCounters& counters) override; + void FromProto(NKikimr::NSysView::TDbServiceCounters& counters) override; +}; + +} // namespace NKikimr::NPrivate diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index 0ebbe4aa0a4..111f530b837 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -1,7 +1,9 @@ #include "tablet_counters_aggregator.h" #include "tablet_counters_app.h" #include "labeled_counters_merger.h" +#include "labeled_db_counters.h" #include "private/aggregated_counters.h" +#include "private/labeled_db_counters.h" #include <library/cpp/actors/core/log.h> #include <ydb/core/mon/mon.h> @@ -161,6 +163,12 @@ public: } } + void ApplyLabeledDbCounters(const TString& dbName, ui64 tabletId, + const TTabletLabeledCountersBase* labeledCounters, const TActorContext& ctx) { + auto iterDbLabeled = GetLabeledDbCounters(dbName, ctx); + iterDbLabeled->Apply(tabletId, labeledCounters); + } + void ForgetTablet(ui64 tabletId, TTabletTypes::EType tabletType, TPathId tenantPathId) { AllTypes->Forget(tabletId); // and now erase from every other path @@ -172,9 +180,15 @@ public: if (auto itPath = CountersByPathId.find(tenantPathId); itPath != CountersByPathId.end()) { itPath->second->Forget(tabletId, tabletType); } - //and from all labeledCounters that could have this tablet - auto iterTabletTypeAndGroup = LabeledCountersByTabletTypeAndGroup.lower_bound(std::make_pair(tabletType, TString())); - for (; iterTabletTypeAndGroup != LabeledCountersByTabletTypeAndGroup.end() && iterTabletTypeAndGroup->first.first == tabletType; ) { + + for (auto iter = LabeledDbCounters.begin(); iter != LabeledDbCounters.end(); ++iter) { + iter->second->ForgetTablet(tabletId); + } + // and from all labeledCounters that could have this tablet + auto iterTabletTypeAndGroup = + LabeledCountersByTabletTypeAndGroup.lower_bound(std::make_pair(tabletType, TString())); + for (; iterTabletTypeAndGroup != LabeledCountersByTabletTypeAndGroup.end() && + iterTabletTypeAndGroup->first.first == tabletType; ) { bool empty = iterTabletTypeAndGroup->second->ForgetTablet(tabletId); if (empty) { iterTabletTypeAndGroup = LabeledCountersByTabletTypeAndGroup.erase(iterTabletTypeAndGroup); @@ -310,6 +324,10 @@ public: CountersByPathId.erase(pathId); } + void RemoveTabletsByDbPath(const TString& dbPath) { + LabeledDbCounters.erase(dbPath); + } + private: // subgroups class TTabletCountersForTabletType : public TThrRefBase { @@ -1049,8 +1067,8 @@ public: : ActorSystem(actorSystem) {} - void OnDatabaseRemoved(const TString&, TPathId pathId) override { - auto evRemove = MakeHolder<TEvTabletCounters::TEvRemoveDatabase>(pathId); + void OnDatabaseRemoved(const TString& dbPath, TPathId pathId) override { + auto evRemove = MakeHolder<TEvTabletCounters::TEvRemoveDatabase>(dbPath, pathId); auto aggregator = MakeTabletCountersAggregatorID(ActorSystem->NodeId, false); ActorSystem->Send(aggregator, evRemove.Release()); } @@ -1078,6 +1096,27 @@ private: return dbCounters; } + NPrivate::TDbLabeledCounters::TPtr GetLabeledDbCounters(const TString& dbName, const TActorContext& ctx) { + auto it = LabeledDbCounters.find(dbName); + if (it != LabeledDbCounters.end()) { + return it->second; + } + + auto dbCounters = MakeIntrusive<NPrivate::TDbLabeledCounters>(); + LabeledDbCounters[dbName] = dbCounters; + + auto evRegister = MakeHolder<NSysView::TEvSysView::TEvRegisterDbCounters>( + NKikimrSysView::LABELED, dbName, dbCounters); + ctx.Send(NSysView::MakeSysViewServiceID(ctx.SelfID.NodeId()), evRegister.Release()); + + if (DbWatcherActorId) { + auto evWatch = MakeHolder<NSysView::TEvSysView::TEvWatchDatabase>(dbName); + ctx.Send(DbWatcherActorId, evWatch.Release()); + } + + return dbCounters; + } + private: ::NMonitoring::TDynamicCounterPtr Counters; TTabletCountersForTabletTypePtr AllTypes; @@ -1085,6 +1124,7 @@ private: typedef THashMap<TPathId, TIntrusivePtr<TTabletCountersForDb>> TCountersByPathId; typedef TMap<TTabletTypes::EType, THolder<TTabletCountersBase>> TAppCountersByTabletType; + typedef THashMap<TString, TIntrusivePtr<NPrivate::TDbLabeledCounters>> TLabeledCountersByDbPath; typedef TMap<std::pair<TTabletTypes::EType, TString>, TAutoPtr<NPrivate::TAggregatedLabeledCounters>> TLabeledCountersByTabletTypeAndGroup; typedef THashMap<ui64, std::pair<TAutoPtr<TTabletCountersBase>, TAutoPtr<TTabletCountersBase>>> TQuietTabletCounters; @@ -1093,6 +1133,7 @@ private: TActorId DbWatcherActorId; TAppCountersByTabletType LimitedAppCounters; // without txs TYdbTabletCountersPtr YdbCounters; + TLabeledCountersByDbPath LabeledDbCounters; TLabeledCountersByTabletTypeAndGroup LabeledCountersByTabletTypeAndGroup; TQuietTabletCounters QuietTabletCounters; }; @@ -1202,8 +1243,14 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletAddLabele if (msg->LabeledCounters.Get()->GetDatabasePath()) { if (msg->TabletType == TTabletTypes::PersQueue) { LOG_DEBUG_S(ctx, NKikimrServices::TABLET_AGGREGATOR, - "got labeledCounters from db" << msg->LabeledCounters.Get()->GetDatabasePath()); + "got labeledCounters from db: " << msg->LabeledCounters.Get()->GetDatabasePath() << + "; tablet: " << msg->TabletID); + TabletMon->ApplyLabeledDbCounters(msg->LabeledCounters.Get()->GetDatabasePath().GetRef(), msg->TabletID, msg->LabeledCounters.Get(), ctx); } else { + LOG_ERROR_S(ctx, NKikimrServices::TABLET_AGGREGATOR, + "got labeledCounters from unknown Tablet Type: " << msg->TabletType << + "; db: " << msg->LabeledCounters.Get()->GetDatabasePath() << + "; tablet: " << msg->TabletID); return; } } else { @@ -1353,8 +1400,8 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletLabeledCo //////////////////////////////////////////// void TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvRemoveDatabase::TPtr& ev) { - TabletMon->RemoveTabletsByPathId(ev->Get()->PathId); + TabletMon->RemoveTabletsByDbPath(ev->Get()->DbPath); } //////////////////////////////////////////// diff --git a/ydb/core/tablet/tablet_counters_aggregator.h b/ydb/core/tablet/tablet_counters_aggregator.h index 1c39b3295e8..be9e336d85c 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.h +++ b/ydb/core/tablet/tablet_counters_aggregator.h @@ -105,10 +105,12 @@ struct TEvTabletCounters { }; struct TEvRemoveDatabase : public TEventLocal<TEvRemoveDatabase, EvRemoveDatabase> { + const TString DbPath; const TPathId PathId; - explicit TEvRemoveDatabase(TPathId pathId) - : PathId(pathId) + TEvRemoveDatabase(const TString& dbPath, TPathId pathId) + : DbPath(dbPath) + , PathId(pathId) {} }; diff --git a/ydb/core/tablet/tablet_counters_aggregator_ut.cpp b/ydb/core/tablet/tablet_counters_aggregator_ut.cpp index 013d292c243..f94a11f898f 100644 --- a/ydb/core/tablet/tablet_counters_aggregator_ut.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator_ut.cpp @@ -1,5 +1,7 @@ #include "tablet_counters_aggregator.h" +#include "private/labeled_db_counters.h" +#include <ydb/core/base/counters.h> #include <ydb/core/testlib/basics/runtime.h> #include <ydb/core/testlib/basics/appdata.h> @@ -824,6 +826,101 @@ Y_UNIT_TEST_SUITE(TTabletLabeledCountersAggregator) { UNIT_ASSERT_VALUES_EQUAL(res[1], "cons/aaa|1|aba/caba/daba|man"); } + Y_UNIT_TEST(DbAggregation) { + TVector<TActorId> cc; + TActorId aggregatorId; + + TTestBasicRuntime runtime(1); + + runtime.Initialize(TAppPrepare().Unwrap()); + runtime.GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true); + + TActorId edge = runtime.AllocateEdgeActor(); + + runtime.SetRegistrationObserverFunc([&cc, &aggregatorId] + (TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + TTestActorRuntime::DefaultRegistrationObserver(runtime, parentId, actorId); + if (parentId == aggregatorId) { + cc.push_back(actorId); + } + }); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + runtime.DispatchEvents(options); + for (const auto& a : cc) { + THolder<TEvInterconnect::TEvNodesInfo> nodesInfo = MakeHolder<TEvInterconnect::TEvNodesInfo>(); + nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(1, "::", "localhost", "localhost", 1234, TNodeLocation())); + nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(2, "::", "localhost", "localhost", 1234, TNodeLocation())); + nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(3, "::", "localhost", "localhost", 1234, TNodeLocation())); + runtime.Send(new NActors::IEventHandle(a, edge, nodesInfo.Release()), 0, true); + } + + NPrivate::TDbLabeledCounters PQCounters; + + const size_t namesN{5}; + std::array<const char *, namesN> names; + names.fill(""); + names[0] = "whatever"; + names[1] = "whenever"; + std::array<const char *, namesN> groupNames; + groupNames.fill("topic"); + groupNames[1] = "user||topic"; + std::array<ui8, namesN> types; + types.fill(static_cast<ui8>(TLabeledCounterOptions::CT_SIMPLE)); + + std::array<ui8, namesN> functions; + functions.fill(static_cast<ui8>(TLabeledCounterOptions::EAF_SUM)); + functions[1] = static_cast<ui8>(TLabeledCounterOptions::EAF_MAX); + + { + NKikimr::TTabletLabeledCountersBase labeledCounters(namesN, &names[0], &types[0], &functions[0], + "some_stream", &groupNames[0], 1, "/Root/PQ1"); + labeledCounters.GetCounters()[0].Set(10); + labeledCounters.GetCounters()[1].Set(10); + PQCounters.Apply(0, &labeledCounters); + labeledCounters.GetCounters()[0].Set(11); + labeledCounters.GetCounters()[1].Set(100); + PQCounters.Apply(1, &labeledCounters); + labeledCounters.GetCounters()[0].Set(12); + labeledCounters.GetCounters()[1].Set(10); + PQCounters.Apply(2, &labeledCounters); + // SUM 33 + // MAX 100 + } + + { + NKikimr::TTabletLabeledCountersBase labeledCounters(namesN, &names[0], &types[0], &functions[0], + "some_stream", &groupNames[0], 1, "/Root/PQ2"); + labeledCounters.GetCounters()[0].Set(20); + labeledCounters.GetCounters()[1].Set(1); + PQCounters.Apply(0, &labeledCounters); + labeledCounters.GetCounters()[0].Set(21); + labeledCounters.GetCounters()[1].Set(11); + PQCounters.Apply(1, &labeledCounters); + labeledCounters.GetCounters()[0].Set(22); + labeledCounters.GetCounters()[1].Set(10); + PQCounters.Apply(2, &labeledCounters); + // SUM 63 + // MAX 11 + } + + NKikimr::NSysView::TDbServiceCounters counters; + + // Here we check that consequent calls do not interfere + for (int i = 10; i >= 0; --i) { + PQCounters.ToProto(counters); + + auto pqCounters = counters.FindOrAddLabeledCounters("some_stream"); + UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().group(), "some_stream"); + UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().delimiter(), "|"); + UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter(0).value(), 63); + UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter(1).value(), 11); + + PQCounters.FromProto(counters); + } + } } } diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h index cc296d9ae48..ea707420e26 100644 --- a/ydb/core/tablet/tablet_counters_protobuf.h +++ b/ydb/core/tablet/tablet_counters_protobuf.h @@ -653,7 +653,7 @@ public: SimpleOpts()->GetAggregateFuncs(), group, SimpleOpts()->GetGroupNames(), id, databasePath) { TVector<TString> groups; - StringSplitter(group).Split('|').SkipEmpty().Collect(&groups); + StringSplitter(group).Split('|').Collect(&groups); Y_VERIFY(SimpleOpts()->GetGroupNamesSize() == groups.size()); } diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp index 1786eaeceb6..a940ed2083a 100644 --- a/ydb/core/testlib/basics/appdata.cpp +++ b/ydb/core/testlib/basics/appdata.cpp @@ -197,5 +197,4 @@ namespace NKikimr { { FeatureFlags.SetEnableDbCounters(value); } - } diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp index f671b629a3f..41d4f3a391f 100644 --- a/ydb/core/testlib/tablet_helpers.cpp +++ b/ydb/core/testlib/tablet_helpers.cpp @@ -1210,6 +1210,8 @@ namespace NKikimr { bootstrapperActorId = Boot(ctx, type, &NSequenceShard::CreateSequenceShard, DataGroupErasure); } else if (type == TTabletTypes::ReplicationController) { bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure); + } else if (type == TTabletTypes::PersQueue) { + bootstrapperActorId = Boot(ctx, type, &CreatePersQueue, DataGroupErasure); } else { status = NKikimrProto::ERROR; } diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp index 292c66d3da5..78d5d2ee455 100644 --- a/ydb/core/testlib/tenant_runtime.cpp +++ b/ydb/core/testlib/tenant_runtime.cpp @@ -23,6 +23,7 @@ #include <ydb/core/tx/tx_allocator/txallocator.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/sys_view/processor/processor.h> +#include <ydb/core/persqueue/pq.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/interconnect/interconnect.h> @@ -432,6 +433,8 @@ class TFakeHive : public TActor<TFakeHive>, public TTabletExecutedFlat { bootstrapperActorId = Boot(ctx, type, &NSequenceShard::CreateSequenceShard, DataGroupErasure); } else if (type == TTabletTypes::ReplicationController) { bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure); + } else if (type == TTabletTypes::PersQueue) { + bootstrapperActorId = Boot(ctx, type, &NKikimr::CreatePersQueue, DataGroupErasure); } else { status = NKikimrProto::ERROR; } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index e29737a1921..c160e2f2b21 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -110,8 +110,6 @@ namespace NKikimr { namespace Tests { - - TServerSettings& TServerSettings::SetDomainName(const TString& value) { StoragePoolTypes.erase("test"); DomainName = value; @@ -425,6 +423,56 @@ namespace Tests { app.AddDomain(domain.Release()); } + TVector<ui64> TServer::StartPQTablets(ui32 pqTabletsN) { + auto getChannelBind = [](const TString& storagePool) { + TChannelBind bind; + bind.SetStoragePoolName(storagePool); + return bind; + }; + TVector<ui64> ids; + ids.reserve(pqTabletsN); + for (ui32 i = 0; i < pqTabletsN; ++i) { + auto tabletId = Tests::ChangeStateStorage(Tests::DummyTablet2 + i + 1, Settings->Domain); + TIntrusivePtr<TTabletStorageInfo> tabletInfo = + CreateTestTabletInfo(tabletId, TTabletTypes::PersQueue); + TIntrusivePtr<TTabletSetupInfo> setupInfo = + new TTabletSetupInfo(&CreatePersQueue, TMailboxType::Simple, 0, TMailboxType::Simple, 0); + + static TString STORAGE_POOL = "/Root:test"; + static TChannelsBindings BINDED_CHANNELS = + {getChannelBind(STORAGE_POOL), getChannelBind(STORAGE_POOL), getChannelBind(STORAGE_POOL)}; + + ui32 nodeIndex = 0; + auto ev = + MakeHolder<TEvHive::TEvCreateTablet>(tabletId, 0, TTabletTypes::PersQueue, BINDED_CHANNELS); + + TActorId senderB = Runtime->AllocateEdgeActor(nodeIndex); + ui64 hive = ChangeStateStorage(Tests::Hive, Settings->Domain); + Runtime->SendToPipe(hive, senderB, ev.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + auto createTabletReply = Runtime->GrabEdgeEventRethrow<TEvHive::TEvCreateTabletReply>(handle); + UNIT_ASSERT(createTabletReply); + auto expectedStatus = NKikimrProto::OK; + UNIT_ASSERT_EQUAL_C(createTabletReply->Record.GetStatus(), expectedStatus, + (ui32)createTabletReply->Record.GetStatus() << " != " << (ui32)expectedStatus); + UNIT_ASSERT_EQUAL_C(createTabletReply->Record.GetOwner(), tabletId, + createTabletReply->Record.GetOwner() << " != " << tabletId); + ui64 id = createTabletReply->Record.GetTabletID(); + while (true) { + auto tabletCreationResult = + Runtime->GrabEdgeEventRethrow<TEvHive::TEvTabletCreationResult>(handle); + UNIT_ASSERT(tabletCreationResult); + if (id == tabletCreationResult->Record.GetTabletID()) { + UNIT_ASSERT_EQUAL_C(tabletCreationResult->Record.GetStatus(), NKikimrProto::OK, + (ui32)tabletCreationResult->Record.GetStatus() << " != " << (ui32)NKikimrProto::OK); + break; + } + } + ids.push_back(id); + } + return ids; + } + void TServer::CreateBootstrapTablets() { const ui32 domainId = Settings->Domain; Y_VERIFY(TDomainsInfo::MakeTxAllocatorIDFixed(domainId, 1) == ChangeStateStorage(TxAllocator, domainId)); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index c7ce2bbb7c2..4bf212ca1db 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -164,6 +164,7 @@ namespace Tests { TServerSettings& SetFeatureFlags(const NKikimrConfig::TFeatureFlags& value) { FeatureFlags = value; return *this; } TServerSettings& SetCompactionConfig(const NKikimrConfig::TCompactionConfig& value) { CompactionConfig = value; return *this; } TServerSettings& SetEnableDbCounters(bool value) { FeatureFlags.SetEnableDbCounters(value); return *this; } + TServerSettings& SetEnablePersistentQueryStats(bool value) { FeatureFlags.SetEnablePersistentQueryStats(value); return *this; } TServerSettings& SetEnableYq(bool value) { EnableYq = value; return *this; } TServerSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; } TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; } @@ -257,6 +258,7 @@ namespace Tests { } } void StartDummyTablets(); + TVector<ui64> StartPQTablets(ui32 pqTabletsN); TTestActorRuntime* GetRuntime() const; const TServerSettings& GetSettings() const; const NScheme::TTypeRegistry* GetTypeRegistry(); |