aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-12-06 17:12:41 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-12-06 17:12:41 +0300
commit9a7839c691f4f5499275025c9ca2fbb82446799d (patch)
tree7b6ba07961017f3d1ab1e430648807cfd03f7a1b
parentf95d13e616e0550ae9fe7feee7c2baa9357bc4a2 (diff)
downloadydb-9a7839c691f4f5499275025c9ca2fbb82446799d.tar.gz
Introduce LabeledCounters to SysView
Introduce labeled_db counters
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/user_info.h4
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp16
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h5
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp19
-rw-r--r--ydb/core/protos/sys_view.proto25
-rw-r--r--ydb/core/sys_view/common/db_counters.h31
-rw-r--r--ydb/core/sys_view/common/events.h14
-rw-r--r--ydb/core/sys_view/processor/db_counters.cpp161
-rw-r--r--ydb/core/sys_view/processor/processor_impl.cpp5
-rw-r--r--ydb/core/sys_view/processor/processor_impl.h13
-rw-r--r--ydb/core/sys_view/processor/tx_init.cpp1
-rw-r--r--ydb/core/sys_view/service/sysview_service.cpp116
-rw-r--r--ydb/core/sys_view/ut_common.cpp8
-rw-r--r--ydb/core/sys_view/ut_common.h11
-rw-r--r--ydb/core/sys_view/ut_counters.cpp2
-rw-r--r--ydb/core/sys_view/ut_kqp.cpp6
-rw-r--r--ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt2
-rw-r--r--ydb/core/sys_view/ut_labeled.cpp331
-rw-r--r--ydb/core/tablet/CMakeLists.txt2
-rw-r--r--ydb/core/tablet/labeled_counters_merger.h1
-rw-r--r--ydb/core/tablet/labeled_db_counters.cpp12
-rw-r--r--ydb/core/tablet/labeled_db_counters.h20
-rw-r--r--ydb/core/tablet/private/aggregated_counters.cpp43
-rw-r--r--ydb/core/tablet/private/aggregated_counters.h4
-rw-r--r--ydb/core/tablet/private/labeled_db_counters.cpp116
-rw-r--r--ydb/core/tablet/private/labeled_db_counters.h39
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.cpp61
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.h6
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator_ut.cpp97
-rw-r--r--ydb/core/tablet/tablet_counters_protobuf.h2
-rw-r--r--ydb/core/testlib/basics/appdata.cpp1
-rw-r--r--ydb/core/testlib/tablet_helpers.cpp2
-rw-r--r--ydb/core/testlib/tenant_runtime.cpp3
-rw-r--r--ydb/core/testlib/test_client.cpp52
-rw-r--r--ydb/core/testlib/test_client.h2
38 files changed, 1183 insertions, 56 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 1f654e259b8..9ae32f6cecd 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -764,7 +764,7 @@ void TPartition::Bootstrap(const TActorContext& ctx) {
userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
}
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "boostrapping " << Partition << " " << ctx.SelfID);
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID);
if (NewPartition) {
InitComplete(ctx);
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 168942c61d4..c9993792bdb 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -303,7 +303,7 @@ struct TUserInfo {
if (AppData(ctx)->Counters) {
if (AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
LabeledCounters.Reset(new TUserLabeledCounters(
- user + "|$x|" + topicConverter->GetClientsideName(), partition, *dbPath));
+ user + "||" + topicConverter->GetClientsideName(), partition, *dbPath));
if (DoInternalRead) {
SetupStreamCounters(ctx, dcId, ToString<ui32>(partition), cloudId, dbId, folderId);
@@ -444,7 +444,7 @@ struct TUserInfo {
void SetImportant(bool important)
{
Important = important;
- if (LabeledCounters) {
+ if (LabeledCounters && !AppData()->PQConfig.GetTopicsAreFirstClassCitizen()) {
LabeledCounters->SetGroup(User + "/" + (important ? "1" : "0") + "/" + TopicConverter->GetClientsideName());
}
}
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index dff94717e61..11fc2b18ef5 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -16,15 +16,15 @@
namespace NKikimr::NPQ {
-void FillPQConfig(NActors::TTestActorRuntime& runtime, const TString& dbRoot, bool isFirstClass) {
- runtime.GetAppData(0).PQConfig.SetEnabled(true);
+void FillPQConfig(NKikimrPQ::TPQConfig& pqConfig, const TString& dbRoot, bool isFirstClass) {
+ pqConfig.SetEnabled(true);
// NOTE(shmel1k@): KIKIMR-14221
- runtime.GetAppData(0).PQConfig.SetTopicsAreFirstClassCitizen(isFirstClass);
- runtime.GetAppData(0).PQConfig.SetRequireCredentialsInNewProtocol(false);
- runtime.GetAppData(0).PQConfig.SetRoot(dbRoot);
- runtime.GetAppData(0).PQConfig.SetClusterTablePath(TStringBuilder() << dbRoot << "/Config/V2/Cluster");
- runtime.GetAppData(0).PQConfig.SetVersionTablePath(TStringBuilder() << dbRoot << "/Config/V2/Versions");
- runtime.GetAppData(0).PQConfig.MutableQuotingConfig()->SetEnableQuoting(false);
+ pqConfig.SetTopicsAreFirstClassCitizen(isFirstClass);
+ pqConfig.SetRequireCredentialsInNewProtocol(false);
+ pqConfig.SetRoot(dbRoot);
+ pqConfig.SetClusterTablePath(TStringBuilder() << dbRoot << "/Config/V2/Cluster");
+ pqConfig.SetVersionTablePath(TStringBuilder() << dbRoot << "/Config/V2/Versions");
+ pqConfig.MutableQuotingConfig()->SetEnableQuoting(false);
}
void PQTabletPrepare(const TTabletPreparationParameters& parameters,
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 64b604c0224..0a13f936ad2 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -22,7 +22,7 @@ inline constexpr static T PlainOrSoSlow(T plain, T slow) noexcept {
constexpr ui32 NUM_WRITES = PlainOrSoSlow(100, 1);
-void FillPQConfig(NActors::TTestActorRuntime& runtime, const TString& dbRoot, bool isFirstClass);
+void FillPQConfig(NKikimrPQ::TPQConfig& pqConfig, const TString& dbRoot, bool isFirstClass);
class TInitialEventsFilter : TNonCopyable {
bool IsDone;
@@ -127,6 +127,7 @@ struct TTestContext {
Runtime->SetScheduledLimit(200);
TAppPrepare appData;
+ appData.SetEnablePersistentQueryStats(enableDbCounters);
appData.SetEnableDbCounters(enableDbCounters);
SetupLogging(*Runtime);
SetupTabletServices(*Runtime, &appData);
@@ -135,7 +136,7 @@ struct TTestContext {
CreateTestTabletInfo(TabletId, PQTabletType, TErasureType::ErasureNone),
&CreatePersQueue);
- FillPQConfig(*Runtime, "/Root/PQ", isFirstClass);
+ FillPQConfig(Runtime->GetAppData(0).PQConfig, "/Root/PQ", isFirstClass);
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index f3d91e1d67e..8c1bce2c57e 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -224,9 +224,18 @@ Y_UNIT_TEST(PartitionFirstClass) {
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
TFinalizer finalizer(tc);
activeZone = false;
+ bool dbRegistered{false};
tc.Prepare(dispatchName, setup, activeZone, true, true, true);
tc.Runtime->SetScheduledLimit(1000);
+ tc.Runtime->SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) {
+ auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database;
+ UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ");
+ dbRegistered = true;
+ }
+ return TTestActorRuntime::DefaultObserverFunc(runtime, event);
+ });
PQTabletPrepare({}, {}, tc);
@@ -244,15 +253,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
tc.Runtime->DispatchEvents(options);
}
-
- IActor* actorX = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PersQueue);
- tc.Runtime->Register(actorX);
-
- TAutoPtr<IEventHandle> handle;
- TEvTabletCounters::TEvTabletLabeledCountersResponse *result;
- result = tc.Runtime->GrabEdgeEvent<TEvTabletCounters::TEvTabletLabeledCountersResponse>(handle);
- UNIT_ASSERT(result);
- UNIT_ASSERT_VALUES_EQUAL(result->Record.LabeledCountersByGroupSize(), 0);
+ UNIT_ASSERT(dbRegistered);
});
}
diff --git a/ydb/core/protos/sys_view.proto b/ydb/core/protos/sys_view.proto
index a3ff84b707d..234646e1245 100644
--- a/ydb/core/protos/sys_view.proto
+++ b/ydb/core/protos/sys_view.proto
@@ -2,6 +2,7 @@ package NKikimrSysView;
option java_package = "ru.yandex.kikimr.proto";
+import "ydb/core/protos/labeled_counters.proto";
import "ydb/core/protos/tablet.proto";
message TPartitionStatsKey {
@@ -519,11 +520,16 @@ message TDbGRpcProxyCounters {
optional TDbCounters RequestCounters = 1;
}
+message TDbLabeledCounters {
+ optional NKikimrLabeledCounters.TTabletLabeledCounters AggregatedPerTablets = 1;
+}
+
message TDbServiceCounters {
optional TDbCounters Main = 1;
repeated TDbTabletCounters TabletCounters = 2;
repeated TDbGRpcCounters GRpcCounters = 3;
optional TDbGRpcProxyCounters GRpcProxyCounters = 4;
+ repeated TDbLabeledCounters LabeledCounters = 5;
}
enum EDbCountersService {
@@ -531,8 +537,9 @@ enum EDbCountersService {
TABLETS = 2;
GRPC = 3;
GRPC_PROXY = 4;
- RESERVED_2 = 5;
+ LABELED = 5;
RESERVED_3 = 6;
+ RESERVED_4 = 7;
}
// node -> sysview processor tablet
@@ -552,6 +559,22 @@ message TEvSendDbCountersResponse {
optional uint64 Generation = 2; // confirmed generation
}
+message TEvSendDbLabeledCountersRequest {
+ message TServiceCounters {
+ optional EDbCountersService Service = 1;
+ optional TDbServiceCounters Counters = 2;
+ }
+ repeated TServiceCounters ServiceCounters = 1;
+ optional uint64 NodeId = 2;
+ optional uint64 Generation = 3;
+}
+
+message TEvSendDbLabeledCountersResponse {
+ optional string Database = 1;
+ optional uint64 Generation = 2; // confirmed generation
+}
+
+
// ---- Top partitions
message TTopPartitionsKey {
diff --git a/ydb/core/sys_view/common/db_counters.h b/ydb/core/sys_view/common/db_counters.h
index 0a1850e78a4..fe6b9856e2c 100644
--- a/ydb/core/sys_view/common/db_counters.h
+++ b/ydb/core/sys_view/common/db_counters.h
@@ -14,12 +14,14 @@ class TDbServiceCounters {
using TGRpcRequestDesc = std::pair<TString, TString>;
THashMap<TGRpcRequestDesc, NKikimrSysView::TDbGRpcCounters*> ByGRpcRequest;
+ THashMap<TString, NKikimrSysView::TDbLabeledCounters*> ByGroupName;
public:
void Swap(TDbServiceCounters& other) {
ProtoCounters.Swap(&other.ProtoCounters);
ByTabletType.swap(other.ByTabletType);
ByGRpcRequest.swap(other.ByGRpcRequest);
+ ByGroupName.swap(other.ByGroupName);
}
NKikimrSysView::TDbServiceCounters& Proto() { return ProtoCounters; }
@@ -73,6 +75,35 @@ public:
return counters;
}
+
+ NKikimrSysView::TDbLabeledCounters* FindLabeledCounters(const TString& groupName) const
+ {
+ if (auto it = ByGroupName.find(groupName); it != ByGroupName.end()) {
+ return it->second;
+ }
+ return {};
+ }
+
+ NKikimrSysView::TDbLabeledCounters* FindOrAddLabeledCounters(const TString& groupName)
+ {
+ if (auto it = ByGroupName.find(groupName); it != ByGroupName.end()) {
+ return it->second;
+ }
+
+ auto* counters = ProtoCounters.AddLabeledCounters();
+ auto lCounters = counters->MutableAggregatedPerTablets();
+ lCounters->SetGroup(groupName);
+ lCounters->SetDelimiter("|");
+ ByGroupName[groupName] = counters;
+
+ return counters;
+ }
+
+ void ClearLabeledCounters()
+ {
+ ProtoCounters.Clear();
+ ByGroupName.clear();
+ }
};
} // NSysView
diff --git a/ydb/core/sys_view/common/events.h b/ydb/core/sys_view/common/events.h
index fb4347db616..b170395a54a 100644
--- a/ydb/core/sys_view/common/events.h
+++ b/ydb/core/sys_view/common/events.h
@@ -59,6 +59,8 @@ struct TEvSysView {
EvRegisterDbCounters,
EvSendDbCountersRequest,
EvSendDbCountersResponse,
+ EvSendDbLabeledCountersRequest,
+ EvSendDbLabeledCountersResponse,
EvWatchDatabase,
EvUpdateTtlStats,
@@ -330,6 +332,18 @@ struct TEvSysView {
EvSendDbCountersResponse>
{};
+ struct TEvSendDbLabeledCountersRequest : public TEventPB<
+ TEvSendDbLabeledCountersRequest,
+ NKikimrSysView::TEvSendDbLabeledCountersRequest,
+ EvSendDbLabeledCountersRequest>
+ {};
+
+ struct TEvSendDbLabeledCountersResponse : public TEventPB<
+ TEvSendDbLabeledCountersResponse,
+ NKikimrSysView::TEvSendDbLabeledCountersResponse,
+ EvSendDbLabeledCountersResponse>
+ {};
+
struct TEvWatchDatabase : public TEventLocal<
TEvWatchDatabase,
EvWatchDatabase>
diff --git a/ydb/core/sys_view/processor/db_counters.cpp b/ydb/core/sys_view/processor/db_counters.cpp
index e2f359e4d61..c941b875ad7 100644
--- a/ydb/core/sys_view/processor/db_counters.cpp
+++ b/ydb/core/sys_view/processor/db_counters.cpp
@@ -5,7 +5,8 @@
#include <ydb/core/grpc_services/counters/counters.h>
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
-#include <ydb/core/tablet/tablet_counters_aggregator.h>
+#include <ydb/core/tablet/labeled_db_counters.h>
+#include <ydb/core/tablet/labeled_counters_merger.h>
#include <ydb/core/tablet_flat/flat_executor_counters.h>
namespace NKikimr {
@@ -120,6 +121,10 @@ static void SwapMaxCounters(NKikimrSysView::TDbCounters* dst, NKikimrSysView::TD
dst->SetCumulativeCount(src.GetCumulativeCount());
};
+static void SwapLabeledCounters(NKikimrLabeledCounters::TTabletLabeledCounters* dst, NKikimrLabeledCounters::TTabletLabeledCounters& src) {
+ dst->MutableLabeledCounter()->Swap(src.MutableLabeledCounter());
+};
+
static void ResetSimpleCounters(NKikimrSysView::TDbCounters* dst) {
auto simpleSize = dst->SimpleSize();
auto* to = dst->MutableSimple();
@@ -137,6 +142,27 @@ static void ResetMaxCounters(NKikimrSysView::TDbCounters* dst) {
}
}
+static void ResetLabeledCounters(NKikimrLabeledCounters::TTabletLabeledCounters* dst) {
+ auto labeledSize = dst->LabeledCounterSize();
+ auto* to = dst->MutableLabeledCounter();
+ for (size_t i = 0; i < labeledSize; ++i) {
+ auto aggrFunc = (*to)[i].GetAggregateFunc();
+ switch (aggrFunc) {
+ case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_MIN):
+ (*to)[i].SetValue(std::numeric_limits<ui64>::max());
+ break;
+ case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_MAX):
+ (*to)[i].SetValue(0);
+ break;
+ case static_cast<int>(TTabletLabeledCountersBase::EAggregateFunc::EAF_SUM):
+ (*to)[i].SetValue(0);
+ break;
+ default:
+ Y_FAIL("bad aggrFunc value");
+ }
+ }
+}
+
template <typename TAggrSum, typename TAggrMax>
static void AggregateCounters(NKikimr::NSysView::TDbServiceCounters* dst,
const NKikimrSysView::TDbServiceCounters& src)
@@ -162,6 +188,24 @@ static void AggregateCounters(NKikimr::NSysView::TDbServiceCounters* dst,
TAggrSum::Apply(dst->Proto().MutableGRpcProxyCounters()->MutableRequestCounters(),
src.GetGRpcProxyCounters().GetRequestCounters());
}
+
+ for (const auto& srcReq : src.GetLabeledCounters()) {
+ auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup());
+ if (dstReq->GetAggregatedPerTablets().GetLabeledCounter().size() <
+ srcReq.GetAggregatedPerTablets().GetLabeledCounter().size()) {
+ const ui32 n = srcReq.GetAggregatedPerTablets().GetLabeledCounter().size() -
+ dstReq->GetAggregatedPerTablets().GetLabeledCounter().size();
+ for (ui32 i = 0; i < n; ++i) {
+ dstReq->MutableAggregatedPerTablets()->AddLabeledCounter();
+ }
+ }
+
+ for (int i = 0; i < srcReq.GetAggregatedPerTablets().GetLabeledCounter().size(); ++i) {
+ const auto& srcCounter = srcReq.GetAggregatedPerTablets().GetLabeledCounter(i);
+ auto* trgCounter = dstReq->MutableAggregatedPerTablets()->MutableLabeledCounter(i);
+ NKikimr::TMerger::MergeOne(srcCounter, *trgCounter);
+ }
+ }
}
static void AggregateIncrementalCounters(NKikimr::NSysView::TDbServiceCounters* dst,
@@ -195,6 +239,11 @@ static void SwapStatefulCounters(NKikimr::NSysView::TDbServiceCounters* dst,
auto* dstReq = dst->FindOrAddGRpcCounters(srcReq.GetGRpcService(), srcReq.GetGRpcRequest());
SwapSimpleCounters(dstReq->MutableRequestCounters(), *srcReq.MutableRequestCounters());
}
+
+ for (auto& srcReq : *src.MutableLabeledCounters()) {
+ auto* dstReq = dst->FindOrAddLabeledCounters(srcReq.GetAggregatedPerTablets().GetGroup());
+ SwapLabeledCounters(dstReq->MutableAggregatedPerTablets(), *srcReq.MutableAggregatedPerTablets());
+ }
}
static void ResetStatefulCounters(NKikimrSysView::TDbServiceCounters* dst) {
@@ -210,6 +259,9 @@ static void ResetStatefulCounters(NKikimrSysView::TDbServiceCounters* dst) {
for (auto& dstReq : *dst->MutableGRpcCounters()) {
ResetSimpleCounters(dstReq.MutableRequestCounters());
}
+ for (auto& dstReq : *dst->MutableLabeledCounters()) {
+ ResetLabeledCounters(dstReq.MutableAggregatedPerTablets());
+ }
}
void TSysViewProcessor::SendNavigate() {
@@ -261,6 +313,10 @@ TIntrusivePtr<IDbCounters> TSysViewProcessor::CreateCountersForService(
result = NGRpcService::CreateGRpcProxyDbCounters(ExternalGroup, group);
break;
}
+ case NKikimrSysView::LABELED: {
+ result = NKikimr::CreateLabeledDbCounters(LabeledGroup);
+ break;
+ }
default:
break;
}
@@ -282,6 +338,13 @@ void TSysViewProcessor::AttachExternalCounters() {
->GetSubgroup("folder_id", FolderId)
->GetSubgroup("database_id", DatabaseId)
->RegisterSubgroup("host", "", ExternalGroup);
+
+ GetServiceCounters(AppData()->Counters, "labeled", false)
+ ->GetSubgroup("database", Database)
+ ->GetSubgroup("cloud_id", CloudId)
+ ->GetSubgroup("folder_id", FolderId)
+ ->GetSubgroup("database_id", DatabaseId)
+ ->RegisterSubgroup("host", "", LabeledGroup);
}
void TSysViewProcessor::AttachInternalCounters() {
@@ -303,6 +366,9 @@ void TSysViewProcessor::DetachExternalCounters() {
GetServiceCounters(AppData()->Counters, "ydb_serverless", false)
->RemoveSubgroup("database", Database);
+
+ GetServiceCounters(AppData()->Counters, "labeled", false)
+ ->RemoveSubgroup("database", Database);
}
void TSysViewProcessor::DetachInternalCounters() {
@@ -374,6 +440,61 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev) {
Send(ev->Sender, std::move(response));
}
+void TSysViewProcessor::Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr& ev) {
+ if (!AppData()->FeatureFlags.GetEnableDbCounters()) {
+ return;
+ }
+
+ auto& record = ev->Get()->Record;
+ auto nodeId = record.GetNodeId();
+
+ auto& state = NodeLabeledCountersStates[nodeId];
+ state.FreshCount = 0;
+
+ if (state.Generation == record.GetGeneration()) {
+ SVLOG_D("[" << TabletID() << "] TEvSendDbLabeledCountersRequest, known generation: "
+ << "node id# " << nodeId
+ << ", generation# " << record.GetGeneration());
+
+ auto response = MakeHolder<TEvSysView::TEvSendDbLabeledCountersResponse>();
+ response->Record.SetDatabase(Database);
+ response->Record.SetGeneration(state.Generation);
+ Send(ev->Sender, std::move(response));
+ return;
+ }
+
+ state.Generation = record.GetGeneration();
+
+ std::unordered_set<NKikimrSysView::EDbCountersService> incomingServicesSet;
+
+ for (auto& serviceCounters : *record.MutableServiceCounters()) {
+ const auto service = serviceCounters.GetService();
+ incomingServicesSet.insert(service);
+
+ auto& simpleState = state.Simple[service];
+ simpleState.ClearLabeledCounters();
+ SwapStatefulCounters(&simpleState, *serviceCounters.MutableCounters());
+ }
+
+ for (auto it = state.Simple.begin(); it != state.Simple.end(); ) {
+ if (incomingServicesSet.find(it->first) == incomingServicesSet.end()) {
+ it = state.Simple.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ SVLOG_D("[" << TabletID() << "] TEvSendDbLabeledCountersRequest: "
+ << "node id# " << nodeId
+ << ", generation# " << state.Generation
+ << ", request size# " << record.ByteSize());
+
+ auto response = MakeHolder<TEvSysView::TEvSendDbLabeledCountersResponse>();
+ response->Record.SetDatabase(Database);
+ response->Record.SetGeneration(state.Generation);
+ Send(ev->Sender, std::move(response));
+}
+
void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) {
for (auto& [_, counters] : AggregatedCountersState) {
ResetStatefulCounters(&counters.Proto());
@@ -391,7 +512,6 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) {
}
++it;
}
-
for (auto& [service, aggrCounters] : AggregatedCountersState) {
TIntrusivePtr<IDbCounters> counters;
if (auto it = Counters.find(service); it != Counters.end()) {
@@ -411,6 +531,43 @@ void TSysViewProcessor::Handle(TEvPrivate::TEvApplyCounters::TPtr&) {
ScheduleApplyCounters();
}
+void TSysViewProcessor::Handle(TEvPrivate::TEvApplyLabeledCounters::TPtr&) {
+ for (auto& [_, counters] : AggregatedLabeledState) {
+ ResetStatefulCounters(&counters.Proto());
+ }
+
+ for (auto it = NodeLabeledCountersStates.begin(); it != NodeLabeledCountersStates.end(); ) {
+ auto& state = it->second;
+ if (state.FreshCount > 1) {
+ it = NodeLabeledCountersStates.erase(it);
+ continue;
+ }
+ ++state.FreshCount;
+ for (const auto& [service, counters] : state.Simple) {
+ AggregateStatefulCounters(&AggregatedLabeledState[service], counters.Proto());
+ }
+ ++it;
+ }
+
+ for (auto& [service, aggrCounters] : AggregatedLabeledState) {
+ TIntrusivePtr<IDbCounters> counters;
+ if (auto it = Counters.find(service); it != Counters.end()) {
+ counters = it->second;
+ } else {
+ counters = CreateCountersForService(service);
+ }
+ if (!counters) {
+ continue;
+ }
+ counters->FromProto(aggrCounters);
+ }
+
+ SVLOG_D("[" << TabletID() << "] TEvApplyLabeledCounters: "
+ << "services count# " << AggregatedLabeledState.size());
+
+ ScheduleApplyLabeledCounters();
+}
+
void TSysViewProcessor::Handle(TEvPrivate::TEvSendNavigate::TPtr&) {
SendNavigate();
}
diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp
index f3c5437984b..1bd8bb5b055 100644
--- a/ydb/core/sys_view/processor/processor_impl.cpp
+++ b/ydb/core/sys_view/processor/processor_impl.cpp
@@ -14,6 +14,7 @@ TSysViewProcessor::TSysViewProcessor(const TActorId& tablet, TTabletStorageInfo*
, TotalInterval(TDuration::Seconds(processorMode == EProcessorMode::FAST ? 1 : 60))
, CollectInterval(TotalInterval / 2)
, ExternalGroup(new ::NMonitoring::TDynamicCounters)
+ , LabeledGroup(new ::NMonitoring::TDynamicCounters)
{
InternalGroups["kqp_serverless"] = new ::NMonitoring::TDynamicCounters;
InternalGroups["tablets_serverless"] = new ::NMonitoring::TDynamicCounters;
@@ -214,6 +215,10 @@ void TSysViewProcessor::ScheduleApplyCounters() {
Schedule(ProcessCountersInterval, new TEvPrivate::TEvApplyCounters);
}
+void TSysViewProcessor::ScheduleApplyLabeledCounters() {
+ Schedule(ProcessLabeledCountersInterval, new TEvPrivate::TEvApplyLabeledCounters);
+}
+
void TSysViewProcessor::ScheduleSendNavigate() {
Schedule(SendNavigateInterval, new TEvPrivate::TEvSendNavigate);
}
diff --git a/ydb/core/sys_view/processor/processor_impl.h b/ydb/core/sys_view/processor/processor_impl.h
index 52c7f6ad5c1..904f545ad8d 100644
--- a/ydb/core/sys_view/processor/processor_impl.h
+++ b/ydb/core/sys_view/processor/processor_impl.h
@@ -50,6 +50,7 @@ private:
EvSendRequests,
EvProcess,
EvApplyCounters,
+ EvApplyLabeledCounters,
EvSendNavigate,
EvEnd
};
@@ -64,6 +65,8 @@ private:
struct TEvApplyCounters : public TEventLocal<TEvApplyCounters, EvApplyCounters> {};
+ struct TEvApplyLabeledCounters : public TEventLocal<TEvApplyLabeledCounters, EvApplyLabeledCounters> {};
+
struct TEvSendNavigate : public TEventLocal<TEvSendNavigate, EvSendNavigate> {};
};
@@ -119,7 +122,9 @@ private:
void Handle(TEvSysView::TEvGetTopPartitionsRequest::TPtr& ev);
void Handle(TEvSysView::TEvSendDbCountersRequest::TPtr& ev);
+ void Handle(TEvSysView::TEvSendDbLabeledCountersRequest::TPtr& ev);
void Handle(TEvPrivate::TEvApplyCounters::TPtr& ev);
+ void Handle(TEvPrivate::TEvApplyLabeledCounters::TPtr& ev);
void Handle(TEvPrivate::TEvSendNavigate::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev);
@@ -148,6 +153,7 @@ private:
void ScheduleCollect();
void ScheduleSendRequests();
void ScheduleApplyCounters();
+ void ScheduleApplyLabeledCounters();
void ScheduleSendNavigate();
template <typename TSchema, typename TMap>
@@ -231,7 +237,9 @@ private:
hFunc(TEvSysView::TEvSendTopPartitions, Handle);
hFunc(TEvSysView::TEvGetTopPartitionsRequest, Handle);
hFunc(TEvSysView::TEvSendDbCountersRequest, Handle);
+ hFunc(TEvSysView::TEvSendDbLabeledCountersRequest, Handle);
hFunc(TEvPrivate::TEvApplyCounters, Handle);
+ hFunc(TEvPrivate::TEvApplyLabeledCounters, Handle);
hFunc(TEvPrivate::TEvSendNavigate, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
@@ -265,6 +273,8 @@ private:
static constexpr size_t BatchSizeLimit = 4 << 20;
// interval of db counters processing
static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5);
+ // interval of db labeled counters processing
+ static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1);
// interval of sending next navigate request
static constexpr TDuration SendNavigateInterval = TDuration::Seconds(5);
@@ -357,6 +367,7 @@ private:
TString DatabaseId;
::NMonitoring::TDynamicCounterPtr ExternalGroup;
+ ::NMonitoring::TDynamicCounterPtr LabeledGroup;
std::unordered_map<TString, ::NMonitoring::TDynamicCounterPtr> InternalGroups;
using TDbCountersServiceMap = std::unordered_map<NKikimrSysView::EDbCountersService,
@@ -368,7 +379,9 @@ private:
size_t FreshCount = 0;
};
std::unordered_map<TNodeId, TNodeCountersState> NodeCountersStates;
+ std::unordered_map<TNodeId, TNodeCountersState> NodeLabeledCountersStates;
TDbCountersServiceMap AggregatedCountersState;
+ TDbCountersServiceMap AggregatedLabeledState;
std::unordered_map<NKikimrSysView::EDbCountersService, TIntrusivePtr<IDbCounters>> Counters;
};
diff --git a/ydb/core/sys_view/processor/tx_init.cpp b/ydb/core/sys_view/processor/tx_init.cpp
index 71a5a135819..11ddac07553 100644
--- a/ydb/core/sys_view/processor/tx_init.cpp
+++ b/ydb/core/sys_view/processor/tx_init.cpp
@@ -461,6 +461,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
if (AppData()->FeatureFlags.GetEnableDbCounters()) {
Self->ScheduleApplyCounters();
+ Self->ScheduleApplyLabeledCounters();
Self->SendNavigate();
}
diff --git a/ydb/core/sys_view/service/sysview_service.cpp b/ydb/core/sys_view/service/sysview_service.cpp
index 92e826cd7fa..8514ff61825 100644
--- a/ydb/core/sys_view/service/sysview_service.cpp
+++ b/ydb/core/sys_view/service/sysview_service.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -354,10 +355,19 @@ public:
}
if (AppData()->FeatureFlags.GetEnableDbCounters()) {
- auto intervalSize = ProcessCountersInterval.MicroSeconds();
- auto deadline = (Now().MicroSeconds() / intervalSize + 1) * intervalSize;
- deadline += RandomNumber<ui64>(intervalSize / 5);
- Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessCounters());
+ {
+ auto intervalSize = ProcessCountersInterval.MicroSeconds();
+ auto deadline = (TInstant::Now().MicroSeconds() / intervalSize + 1) * intervalSize;
+ deadline += RandomNumber<ui64>(intervalSize / 5);
+ Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessCounters());
+ }
+
+ {
+ auto intervalSize = ProcessLabeledCountersInterval.MicroSeconds();
+ auto deadline = (TInstant::Now().MicroSeconds() / intervalSize + 1) * intervalSize;
+ deadline += RandomNumber<ui64>(intervalSize / 5);
+ Schedule(TInstant::MicroSeconds(deadline), new TEvPrivate::TEvProcessLabeledCounters());
+ }
auto callback = MakeIntrusive<TServiceDbWatcherCallback>(ctx.ActorSystem());
DbWatcherActorId = ctx.Register(CreateDbWatcherActor(callback));
@@ -379,9 +389,11 @@ public:
hFunc(TEvPrivate::TEvProcessInterval, Handle);
hFunc(TEvPrivate::TEvSendSummary, Handle);
hFunc(TEvPrivate::TEvProcessCounters, Handle);
+ hFunc(TEvPrivate::TEvProcessLabeledCounters, Handle);
hFunc(TEvPrivate::TEvRemoveDatabase, Handle);
hFunc(TEvSysView::TEvRegisterDbCounters, Handle);
hFunc(TEvSysView::TEvSendDbCountersResponse, Handle);
+ hFunc(TEvSysView::TEvSendDbLabeledCountersResponse, Handle);
hFunc(TEvSysView::TEvGetIntervalMetricsRequest, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
@@ -398,6 +410,7 @@ private:
EvProcessInterval = EventSpaceBegin(TEvents::ES_PRIVATE),
EvSendSummary,
EvProcessCounters,
+ EvProcessLabeledCounters,
EvRemoveDatabase,
EvEnd
};
@@ -423,6 +436,9 @@ private:
struct TEvProcessCounters : public TEventLocal<TEvProcessCounters, EvProcessCounters> {
};
+ struct TEvProcessLabeledCounters : public TEventLocal<TEvProcessLabeledCounters, EvProcessLabeledCounters> {
+ };
+
struct TEvRemoveDatabase : public TEventLocal<TEvRemoveDatabase, EvRemoveDatabase> {
TString Database;
TPathId PathId;
@@ -565,15 +581,18 @@ private:
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
}
+ template <typename T>
+ requires std::is_same_v<T, TEvSysView::TEvSendDbCountersRequest> ||
+ std::is_same_v<T, TEvSysView::TEvSendDbLabeledCountersRequest>
void SendCounters(const TString& database) {
auto processorId = GetProcessorId(database);
if (!processorId) {
return;
}
- auto& dbCounters = DatabaseCounters[database];
-
- auto sendEv = MakeHolder<TEvSysView::TEvSendDbCountersRequest>();
+ constexpr bool isLabeled = std::is_same<T, TEvSysView::TEvSendDbLabeledCountersRequest>::value;
+ auto& dbCounters = isLabeled ? DatabaseLabeledCounters[database] : DatabaseCounters[database];
+ auto sendEv = MakeHolder<T>();
auto& record = sendEv->Record;
if (dbCounters.IsConfirmed) {
@@ -595,7 +614,11 @@ private:
serviceCounters->SetService(service);
auto* diff = serviceCounters->MutableCounters();
- CalculateCountersDiff(diff, state.Current, state.Confirmed);
+ if (isLabeled) {
+ diff->CopyFrom(state.Current.Proto());
+ } else {
+ CalculateCountersDiff(diff, state.Current, state.Confirmed);
+ }
}
SVLOG_D("Send counters: "
@@ -604,7 +627,8 @@ private:
<< ", database# " << database
<< ", generation# " << record.GetGeneration()
<< ", node id# " << record.GetNodeId()
- << ", is retrying# " << dbCounters.IsRetrying);
+ << ", is retrying# " << dbCounters.IsRetrying
+ << ", is labeled# " << isLabeled);
Send(MakePipePeNodeCacheID(false),
new TEvPipeCache::TEvForward(sendEv.Release(), processorId, true),
@@ -642,6 +666,23 @@ private:
it->second.States[service].Counters = counters;
}
+ void RegisterDbLabeledCounters(const TString& database, NKikimrSysView::EDbCountersService service,
+ TIntrusivePtr<IDbCounters> counters)
+ {
+ auto [it, inserted] = DatabaseLabeledCounters.try_emplace(database, TDbCounters());
+ if (inserted) {
+ if (ProcessorIds.find(database) == ProcessorIds.end()) {
+ RequestProcessorId(database);
+ }
+
+ if (DbWatcherActorId) {
+ auto evWatch = MakeHolder<NSysView::TEvSysView::TEvWatchDatabase>(database);
+ Send(DbWatcherActorId, evWatch.Release());
+ }
+ }
+ it->second.States[service].Counters = counters;
+ }
+
void Handle(TEvPrivate::TEvSendSummary::TPtr& ev) {
auto prevIntervalEnd = IntervalEnd - TotalInterval;
auto intervalEnd = ev->Get()->IntervalEnd;
@@ -726,12 +767,23 @@ private:
<< "service id# " << SelfId());
for (auto& [database, dbCounters] : DatabaseCounters) {
- SendCounters(database);
+ SendCounters<TEvSysView::TEvSendDbCountersRequest>(database);
}
Schedule(ProcessCountersInterval, new TEvPrivate::TEvProcessCounters());
}
+ void Handle(TEvPrivate::TEvProcessLabeledCounters::TPtr&) {
+ SVLOG_D("Handle TEvPrivate::TEvProcessLabeledCounters: "
+ << "service id# " << SelfId());
+
+ for (auto& [database, dbCounters] : DatabaseLabeledCounters) {
+ SendCounters<TEvSysView::TEvSendDbLabeledCountersRequest>(database);
+ }
+
+ Schedule(ProcessLabeledCountersInterval, new TEvPrivate::TEvProcessLabeledCounters());
+ }
+
void Handle(TEvPrivate::TEvRemoveDatabase::TPtr& ev) {
auto database = ev->Get()->Database;
auto pathId = ev->Get()->PathId;
@@ -749,6 +801,7 @@ private:
ProcessorIds.erase(database);
Attempts.erase(database);
DatabaseCounters.erase(database);
+ DatabaseLabeledCounters.erase(database);
UnresolvedTabletCounters.erase(pathId);
}
@@ -781,7 +834,7 @@ private:
}
if (dbCounters.IsRetrying) {
- SendCounters(database);
+ SendCounters<TEvSysView::TEvSendDbCountersRequest>(database);
}
SVLOG_D("Handle TEvSysView::TEvSendDbCountersResponse: "
@@ -790,6 +843,36 @@ private:
<< ", generation# " << generation);
}
+ void Handle(TEvSysView::TEvSendDbLabeledCountersResponse::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ const auto& database = record.GetDatabase();
+ const auto generation = record.GetGeneration();
+
+ auto it = DatabaseLabeledCounters.find(database);
+ if (it == DatabaseLabeledCounters.end()) {
+ return;
+ }
+
+ auto& dbCounters = it->second;
+ if (generation != dbCounters.Generation) {
+ return;
+ }
+
+ dbCounters.IsConfirmed = true;
+ for (auto& [_, state] : dbCounters.States) {
+ state.Confirmed.Swap(state.Current);
+ }
+
+ if (dbCounters.IsRetrying) {
+ SendCounters<TEvSysView::TEvSendDbLabeledCountersRequest>(database);
+ }
+
+ SVLOG_D("Handle TEvSysView::TEvSendDbLabeledCountersResponse: "
+ << "service id# " << SelfId()
+ << ", database# " << database
+ << ", generation# " << generation);
+ }
+
void Handle(TEvSysView::TEvRegisterDbCounters::TPtr& ev) {
const auto service = ev->Get()->Service;
@@ -803,6 +886,15 @@ private:
<< ", path id# " << pathId
<< ", service# " << (int)service);
+ } else if (service == NKikimrSysView::LABELED) {
+ const auto& database = ev->Get()->Database;
+ RegisterDbLabeledCounters(database, service, ev->Get()->Counters);
+
+ SVLOG_D("Handle TEvSysView::TEvRegisterDbLabeledCounters: "
+ << "service id# " << SelfId()
+ << ", database# " << database
+ << ", service# " << (int)service);
+
} else { // register by database name
const auto& database = ev->Get()->Database;
RegisterDbCounters(database, service, ev->Get()->Counters);
@@ -1105,6 +1197,7 @@ private:
};
std::unordered_map<TString, TDbCounters> DatabaseCounters;
+ std::unordered_map<TString, TDbCounters> DatabaseLabeledCounters;
THashMap<TPathId, TIntrusivePtr<IDbCounters>> UnresolvedTabletCounters;
TActorId DbWatcherActorId;
@@ -1115,6 +1208,7 @@ private:
static constexpr size_t SummaryRetryAttempts = 5;
static constexpr TDuration ProcessCountersInterval = TDuration::Seconds(5);
+ static constexpr TDuration ProcessLabeledCountersInterval = TDuration::Minutes(1);
};
THolder<IActor> CreateSysViewService(
diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp
index 898e4cb8c28..2579cf026d4 100644
--- a/ydb/core/sys_view/ut_common.cpp
+++ b/ydb/core/sys_view/ut_common.cpp
@@ -1,4 +1,5 @@
#include "ut_common.h"
+#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
namespace NKikimr {
namespace NSysView {
@@ -25,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString
return subdomain;
}
-TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool enableSVP) {
+TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) {
auto mbusPort = PortManager.GetPort();
auto grpcPort = PortManager.GetPort();
@@ -64,6 +65,11 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool
Client->InitRootScheme("Root");
+ if (pqTabletsN) {
+ NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true);
+ PqTabletIds = Server->StartPQTablets(pqTabletsN);
+ }
+
Endpoint = "localhost:" + ToString(grpcPort);
DriverConfig = NYdb::TDriverConfig().SetEndpoint(Endpoint);
Driver = MakeHolder<NYdb::TDriver>(DriverConfig);
diff --git a/ydb/core/sys_view/ut_common.h b/ydb/core/sys_view/ut_common.h
index e142b969095..0c25bfab589 100644
--- a/ydb/core/sys_view/ut_common.h
+++ b/ydb/core/sys_view/ut_common.h
@@ -19,7 +19,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(
class TTestEnv {
public:
TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0,
- bool enableSVP = false);
+ ui32 pqTabletsN = 0, bool enableSVP = false);
~TTestEnv();
Tests::TServer& GetServer() const {
@@ -42,6 +42,14 @@ public:
return Endpoint;
}
+ const Tests::TServerSettings::TPtr GetSettings() const {
+ return Settings;
+ }
+
+ const TVector<ui64>& GetPqTabletIds() const {
+ return PqTabletIds;
+ }
+
TStoragePools GetPools() const;
TStoragePools CreatePoolsForTenant(const TString& tenant);
@@ -57,6 +65,7 @@ private:
TString Endpoint;
NYdb::TDriverConfig DriverConfig;
THolder<NYdb::TDriver> Driver;
+ TVector<ui64> PqTabletIds;
};
} // NSysView
diff --git a/ydb/core/sys_view/ut_counters.cpp b/ydb/core/sys_view/ut_counters.cpp
index 51bc0c49650..66623084656 100644
--- a/ydb/core/sys_view/ut_counters.cpp
+++ b/ydb/core/sys_view/ut_counters.cpp
@@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) {
Y_UNIT_TEST_SUITE(DbCounters) {
Y_UNIT_TEST(TabletsSimple) {
- TTestEnv env(1, 2, 0, true);
+ TTestEnv env(1, 2, 0, 0, true);
CreateDatabasesAndTables(env);
for (size_t iter = 0; iter < 30; ++iter) {
diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp
index 898272e5caa..502b9174e41 100644
--- a/ydb/core/sys_view/ut_kqp.cpp
+++ b/ydb/core/sys_view/ut_kqp.cpp
@@ -1174,7 +1174,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
auto nowUs = TInstant::Now().MicroSeconds();
- TTestEnv env(1, 4, 0, true);
+ TTestEnv env(1, 4, 0, 0, true);
CreateTenantsAndTables(env);
TTableClient client(env.GetDriver());
@@ -1226,7 +1226,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
constexpr ui64 partitionCount = 5;
- TTestEnv env(1, 4, 0, true);
+ TTestEnv env(1, 4, 0, 0, true);
CreateTenantsAndTables(env, true, partitionCount);
TTableClient client(env.GetDriver());
@@ -1256,7 +1256,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
constexpr ui64 partitionCount = 5;
- TTestEnv env(1, 4, 0, true);
+ TTestEnv env(1, 4, 0, 0, true);
CreateTenantsAndTables(env, true, partitionCount);
TTableClient client(env.GetDriver());
diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt
index 7331e727891..d2ef408e870 100644
--- a/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt
+++ b/ydb/core/sys_view/ut_kqp/CMakeLists.darwin.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC
cpp-testing-unittest
cpp-yson-node
kqp-ut-common
+ persqueue-ut-common
core-testlib-default
cpp-client-draft
)
@@ -38,6 +39,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp
)
add_test(
NAME
diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt
index e61f991e75c..d3d8170c6a8 100644
--- a/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/sys_view/ut_kqp/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC
cpp-testing-unittest
cpp-yson-node
kqp-ut-common
+ persqueue-ut-common
core-testlib-default
cpp-client-draft
)
@@ -40,6 +41,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp
)
add_test(
NAME
diff --git a/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt b/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt
index 4e7d32c305e..0b13142c672 100644
--- a/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt
+++ b/ydb/core/sys_view/ut_kqp/CMakeLists.linux.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-sys_view-ut_kqp PUBLIC
cpp-testing-unittest
cpp-yson-node
kqp-ut-common
+ persqueue-ut-common
core-testlib-default
cpp-client-draft
)
@@ -42,6 +43,7 @@ target_sources(ydb-core-sys_view-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_kqp.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/sys_view/ut_labeled.cpp
)
add_test(
NAME
diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp
new file mode 100644
index 00000000000..2ef24fdd2d0
--- /dev/null
+++ b/ydb/core/sys_view/ut_labeled.cpp
@@ -0,0 +1,331 @@
+#include "ut_common.h"
+
+#include "ut_common.h"
+
+#include <ydb/core/base/counters.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
+
+namespace NKikimr {
+namespace NSysView {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+using namespace NYdb::NScheme;
+
+const ui32 partitionsN = 32;
+const TString topicName = "topic";
+
+
+namespace {
+
+void CreateDatabase(TTestEnv& env, const TString& databaseName) {
+ auto subdomain = GetSubDomainDeclareSettings(databaseName);
+ UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
+ env.GetClient().CreateExtSubdomain("/Root", subdomain));
+
+ env.GetTenants().Run("/Root/" + databaseName, 1);
+
+ auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools());
+ subdomainSettings.SetExternalSysViewProcessor(true);
+ subdomainSettings.SetExternalSchemeShard(true);
+ UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
+ env.GetClient().AlterExtSubdomain("/Root", subdomainSettings));
+}
+
+bool CheckCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue,
+ bool isDerivative) {
+ auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val();
+ return (value == refValue);
+}
+
+bool CheckLtCounter(::NMonitoring::TDynamicCounterPtr group, const char* sensorName, ui32 refValue,
+ bool isDerivative) {
+ auto value = group->GetNamedCounter("name", sensorName, isDerivative)->Val();
+ return (value <= refValue);
+}
+
+bool CheckLabeledCounters(::NMonitoring::TDynamicCounterPtr databaseGroup, const TString& dbId,
+ std::function<bool(::NMonitoring::TDynamicCounterPtr)> particularCountersCheck) {
+ bool isGood{true};
+ Y_UNUSED(dbId);
+ auto topicGroup = databaseGroup
+ ->GetSubgroup("cloud_id", "")
+ ->GetSubgroup("folder_id", "")
+ ->GetSubgroup("database_id", "")
+ ->GetSubgroup("host", "")
+ ->GetSubgroup("topic", topicName);
+ {
+ {
+ TStringStream ss;
+ topicGroup->OutputHtml(ss);
+ Cerr << ss.Str() << Endl;
+ }
+
+ isGood &= particularCountersCheck(topicGroup);
+ }
+
+ return isGood;
+}
+
+void GetCounters(TTestEnv& env, const TString& databaseName, const TString& databasePath,
+ std::function<bool(::NMonitoring::TDynamicCounterPtr)> particularCountersCheck) {
+ for (size_t iter = 0; iter < 30; ++iter) {
+ Cerr << "iteration " << iter << Endl;
+
+ bool checkDb = false;
+
+ for (ui32 nodeId = 0; nodeId < env.GetServer().GetRuntime()->GetNodeCount(); ++nodeId) {
+ auto counters = env.GetServer().GetRuntime()->GetAppData(nodeId).Counters;
+ auto labeledGroup = GetServiceCounters(counters, "labeled", false);
+ Y_VERIFY(labeledGroup);
+
+ auto databaseGroup = labeledGroup->FindSubgroup("database", databasePath);
+ if (databaseGroup) {
+ checkDb = CheckLabeledCounters(databaseGroup, databaseName, particularCountersCheck);
+ }
+ }
+
+ if (checkDb) {
+ return;
+ }
+
+ Sleep(TDuration::Seconds(5));
+ }
+ UNIT_ASSERT_C(false, "out of iterations");
+}
+
+} // namespace
+
+Y_UNIT_TEST_SUITE(LabeledDbCounters) {
+
+ Y_UNIT_TEST(OneTablet) {
+ TTestEnv env(1, 2, 0, 1, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+
+ CreateDatabase(env, databaseName);
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ env.GetPqTabletIds()[0], edge);
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ Y_UNIT_TEST(OneTabletRestart) {
+ TTestEnv env(1, 2, 0, 1, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();
+
+ CreateDatabase(env, databaseName);
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ env.GetPqTabletIds()[0], edge);
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ {
+ TStringStream ss;
+ topicGroup->OutputHtml(ss);
+ Cerr << ss.Str() << Endl;
+ }
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ env.GetServer().GetRuntime()->Register(CreateTabletKiller(env.GetPqTabletIds()[0]));
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckLtCounter(topicGroup, "topic.max_partition_uptime_milliseconds",
+ TDuration::Minutes(1).MilliSeconds() + 200, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false);
+ return isGood;
+ };
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+ }
+
+ Y_UNIT_TEST(TwoTablets) {
+ TTestEnv env(1, 2, 0, 2, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+
+ CreateDatabase(env, databaseName);
+ for (auto& tbId : env.GetPqTabletIds()) {
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ tbId, env.GetServer().GetRuntime()->AllocateEdgeActor());
+ }
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ Y_UNIT_TEST(TwoTabletsKillOneTablet) {
+ TTestEnv env(1, 2, 0, 2, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();
+ CreateDatabase(env, databaseName);
+ for (auto& tbId : env.GetPqTabletIds()) {
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ tbId, edge);
+ }
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ for (ui32 i = 0; i < env.GetServer().StaticNodes() + env.GetServer().DynamicNodes(); i++) {
+ env.GetClient().MarkNodeInHive(env.GetServer().GetRuntime(), i, false);
+ }
+ env.GetServer().GetRuntime()->Register(CreateTabletKiller(env.GetPqTabletIds()[0]));
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false);
+
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+ }
+
+ Y_UNIT_TEST(TwoTabletsDisconnectOneNode) {
+ TTestEnv env(1, 2, 0, 2, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();
+ CreateDatabase(env, databaseName);
+ for (auto& tbId : env.GetPqTabletIds()) {
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ tbId, edge);
+ }
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ env.GetServer().GetRuntime()->DisconnectNodes(0, 1, false);
+ env.GetServer().GetRuntime()->DisconnectNodes(0, 2, false);
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false);
+
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+ }
+
+ Y_UNIT_TEST(TwoTabletsDisconnectOneNodeHardWay) {
+ TTestEnv env(1, 2, 0, 2, true);
+ const TString databaseName = NPQ::TTabletPreparationParameters().databaseId;
+ const TString databasePath = NPQ::TTabletPreparationParameters().databasePath;
+ auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();
+ CreateDatabase(env, databaseName);
+ for (auto& tbId : env.GetPqTabletIds()) {
+ NPQ::PQTabletPrepare({.partitions=partitionsN}, {}, *env.GetServer().GetRuntime(),
+ tbId, edge);
+ }
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN*2, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.write_speed_bytes_per_second", 50'000'000, false);
+ isGood &= CheckCounter(topicGroup, "topic.producers_count", 0, false);
+
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+
+ for (ui32 i = 0; i < env.GetServer().StaticNodes() + env.GetServer().DynamicNodes(); i++) {
+ env.GetClient().MarkNodeInHive(env.GetServer().GetRuntime(), i, false);
+ }
+
+ env.GetServer().GetRuntime()->DisconnectNodes(0, 1, true);
+ env.GetServer().GetRuntime()->DisconnectNodes(0, 2, true);
+
+ {
+ auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) {
+ bool isGood{true};
+
+ isGood &= CheckCounter(topicGroup, "topic.partitions.alive_count", partitionsN, false);
+ isGood &= CheckCounter(topicGroup, "topic.partitions.total_count", partitionsN, false);
+ return isGood;
+ };
+
+ Sleep(TDuration::Minutes(1));
+ GetCounters(env, databaseName, databasePath, check);
+ }
+ }
+}
+
+} // NSysView
+} // NKikimr
diff --git a/ydb/core/tablet/CMakeLists.txt b/ydb/core/tablet/CMakeLists.txt
index 80834d9cfc9..ec3f900097d 100644
--- a/ydb/core/tablet/CMakeLists.txt
+++ b/ydb/core/tablet/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tablet PUBLIC
target_sources(ydb-core-tablet PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet/bootstrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/labeled_counters_merger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet/labeled_db_counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/node_tablet_monitor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/node_whiteboard.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/pipe_tracker.cpp
@@ -58,4 +59,5 @@ target_sources(ydb-core-tablet PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet/tablet_sys.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/tablet_tracing_signals.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet/private/aggregated_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet/private/labeled_db_counters.cpp
)
diff --git a/ydb/core/tablet/labeled_counters_merger.h b/ydb/core/tablet/labeled_counters_merger.h
index c3c1a9653a1..097696b9d27 100644
--- a/ydb/core/tablet/labeled_counters_merger.h
+++ b/ydb/core/tablet/labeled_counters_merger.h
@@ -46,6 +46,7 @@ public:
target.SetValue(value);
target.SetType(type);
target.SetAggregateFunc(func);
+ target.SetNameId(source.GetNameId());
}
}
diff --git a/ydb/core/tablet/labeled_db_counters.cpp b/ydb/core/tablet/labeled_db_counters.cpp
new file mode 100644
index 00000000000..1b4e1b86163
--- /dev/null
+++ b/ydb/core/tablet/labeled_db_counters.cpp
@@ -0,0 +1,12 @@
+#include "labeled_db_counters.h"
+#include "private/labeled_db_counters.h"
+
+
+namespace NKikimr {
+
+TIntrusivePtr<NSysView::IDbCounters> CreateLabeledDbCounters(
+ ::NMonitoring::TDynamicCounterPtr externalGroup) {
+ return MakeIntrusive<NPrivate::TDbLabeledCounters>(externalGroup);
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/tablet/labeled_db_counters.h b/ydb/core/tablet/labeled_db_counters.h
new file mode 100644
index 00000000000..123f1b9ad75
--- /dev/null
+++ b/ydb/core/tablet/labeled_db_counters.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include "tablet_counters.h"
+
+
+namespace NKikimr {
+
+class ILabeledCounters : public virtual TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<ILabeledCounters>;
+
+ virtual void Apply(ui64 TabletID, const NKikimr::TTabletLabeledCountersBase* labeledCounters) = 0;
+ virtual void ForgetTablet(ui64 tabletID) = 0;
+ virtual void UseDatabase(const TString& database) { Y_UNUSED(database); }
+};
+
+TIntrusivePtr<NSysView::IDbCounters> CreateLabeledDbCounters(::NMonitoring::TDynamicCounterPtr externalGroup);
+
+} // namespace NKikimr
diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp
index 2dccbf9e323..66ed7517708 100644
--- a/ydb/core/tablet/private/aggregated_counters.cpp
+++ b/ydb/core/tablet/private/aggregated_counters.cpp
@@ -574,6 +574,43 @@ void TAggregatedLabeledCounters::FillGetRequestV2(
}
}
+void TAggregatedLabeledCounters::ToProto(NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const {
+ if (Changed) {
+ for (ui32 idx : xrange(CountersByTabletId.size())) {
+ Recalc(idx);
+ }
+ Changed = false;
+ }
+ ui32 updatedCount{0};
+ for (ui32 i = 0; i < Size(); ++i) {
+ if (strlen(Names[i]) != 0) {
+ if (labeledCounters.LabeledCounterSize() <= updatedCount) {
+ labeledCounters.AddLabeledCounter();
+ }
+ auto& labeledCounter = *labeledCounters.MutableLabeledCounter(updatedCount);
+ labeledCounter.SetValue(GetValue(i));
+ labeledCounter.SetNameId(i);
+ labeledCounter.SetAggregateFunc(NKikimr::TLabeledCounterOptions::EAggregateFunc(AggrFunc[i]));
+ labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i]));
+ ++updatedCount;
+ }
+ }
+}
+
+void TAggregatedLabeledCounters::FromProto(
+ NMonitoring::TDynamicCounterPtr group,
+ const NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const {
+ for (const auto& counter : labeledCounters.GetLabeledCounter()) {
+ const ui32 nameId{counter.GetNameId()};
+ if (strlen(Names[nameId]) != 0) {
+ // TODO: ASDFGS if CT_TIMELAG -> ctx.Now() - counters.GetValue
+ const bool derived = counter.GetType() == TLabeledCounterOptions::CT_DERIV;
+ auto namedCounter = group->GetNamedCounter("name", Names[nameId], derived);
+ *namedCounter = counter.GetValue();
+ }
+ }
+}
+
void TAggregatedLabeledCounters::Recalc(ui32 idx) const {
Y_VERIFY(idx < Ids.size());
auto &counters = CountersByTabletId[idx];
@@ -581,7 +618,11 @@ void TAggregatedLabeledCounters::Recalc(ui32 idx) const {
std::pair<ui64, ui64> aggrVal{0,0};
ui64 cntCount = counters.size();
- Y_VERIFY(cntCount > 0);
+ // Y_VERIFY(cntCount > 0);
+ if (cntCount == 0) {
+ return;
+ }
+
if (aggrFunc == TTabletLabeledCountersBase::EAggregateFunc::EAF_MIN)
aggrVal = counters.begin()->second;
diff --git a/ydb/core/tablet/private/aggregated_counters.h b/ydb/core/tablet/private/aggregated_counters.h
index 24e5ab04d36..c099942d8e7 100644
--- a/ydb/core/tablet/private/aggregated_counters.h
+++ b/ydb/core/tablet/private/aggregated_counters.h
@@ -156,6 +156,10 @@ public:
void FillGetRequestV2(NKikimr::TTabletLabeledCountersResponseContext* context, const TString& group) const;
+ void ToProto(NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const;
+ void FromProto(NMonitoring::TDynamicCounterPtr group,
+ const NKikimrLabeledCounters::TTabletLabeledCounters& labeledCounters) const;
+
private:
//
::NMonitoring::TDynamicCounterPtr CounterGroup;
diff --git a/ydb/core/tablet/private/labeled_db_counters.cpp b/ydb/core/tablet/private/labeled_db_counters.cpp
new file mode 100644
index 00000000000..ddf01f56824
--- /dev/null
+++ b/ydb/core/tablet/private/labeled_db_counters.cpp
@@ -0,0 +1,116 @@
+#include "labeled_db_counters.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <util/string/split.h>
+#include <ydb/core/sys_view/service/sysview_service.h>
+
+namespace NKikimr::NPrivate {
+
+/*
+** class TPQCounters
+ */
+
+THashMap<TString, TAutoPtr<TAggregatedLabeledCounters>> TPQCounters::LabeledCountersByGroupReference = {};
+
+TPQCounters::TPQCounters(NMonitoring::TDynamicCounterPtr counters) {
+ Group = counters;
+}
+
+void TPQCounters::Apply(ui64 tabletId, const NKikimr::TTabletLabeledCountersBase* labeledCounters) {
+ const TString group = labeledCounters->GetGroup();
+ TString groupNames;
+
+ if (!LabeledCountersByGroup.Has(group)) {
+ TVector<TString> rr;
+ StringSplitter(group).Split('|').Collect(&rr);
+ for (ui32 i = 0; i < rr.size(); ++i) {
+ if (i > 0)
+ groupNames += '|';
+ groupNames += labeledCounters->GetGroupName(i);
+ }
+
+ if (!LabeledCountersByGroupReference.contains(groupNames)) {
+ LabeledCountersByGroupReference.emplace(groupNames, new TAggregatedLabeledCounters(
+ labeledCounters->GetCounters().Size(), labeledCounters->GetAggrFuncs(),
+ labeledCounters->GetNames(), labeledCounters->GetTypes(), groupNames));
+ }
+ }
+
+ auto& el = LabeledCountersByGroup.InsertIfAbsent(group, new TAggregatedLabeledCounters(
+ labeledCounters->GetCounters().Size(), labeledCounters->GetAggrFuncs(),
+ labeledCounters->GetNames(), labeledCounters->GetTypes(), groupNames));
+
+ for (ui32 i = 0, N = labeledCounters->GetCounters().Size(); i < N; ++i) {
+ const ui64& value = labeledCounters->GetCounters()[i].Get();
+ // FIXME (?):
+ // const ui64& id = labeledCounters->GetIds()[i].Get();
+ const ui64 id = i;
+ el->SetValue(tabletId, i, value, id);
+ }
+}
+
+void TPQCounters::ForgetTablet(ui64 tabletId) {
+ for (auto& bucket : LabeledCountersByGroup.Buckets) {
+ TWriteGuard guard(bucket.GetLock());
+ auto& map = bucket.GetMap();
+ for (auto iterator = map.begin(); iterator != map.end();) {
+ bool empty = iterator->second->ForgetTablet(tabletId);
+ if (empty) {
+ auto eraseIterator = iterator;
+ ++iterator;
+ map.erase(eraseIterator);
+ } else {
+ ++iterator;
+ }
+ }
+ }
+}
+
+/*
+** class TDbLabeledCounters
+ */
+
+TDbLabeledCounters::TDbLabeledCounters()
+: TPQCounters(MakeIntrusive<::NMonitoring::TDynamicCounters>())
+{}
+
+TDbLabeledCounters::TDbLabeledCounters(::NMonitoring::TDynamicCounterPtr counters)
+: TPQCounters(counters)
+{}
+
+void TDbLabeledCounters::ToProto(NKikimr::NSysView::TDbServiceCounters& counters) {
+ counters.ClearLabeledCounters();
+ for (auto& bucket : LabeledCountersByGroup.Buckets) {
+ TWriteGuard guard(bucket.GetLock());
+ for (auto& [group, labeledCounters] : bucket.GetMap()) {
+ auto* proto = counters.FindOrAddLabeledCounters(group);
+ auto* labeled = proto->MutableAggregatedPerTablets();
+ labeledCounters->ToProto(*labeled);
+ }
+ }
+}
+
+void TDbLabeledCounters::FromProto(NKikimr::NSysView::TDbServiceCounters& counters) {
+ for (auto& proto : *counters.Proto().MutableLabeledCounters()) {
+ TVector<TString> groups;
+ TVector<TString> groupNames = {"topic", "important", "consumer"};
+ Y_VERIFY(proto.GetAggregatedPerTablets().GetDelimiter() == "|");
+ StringSplitter(proto.GetAggregatedPerTablets().GetGroup()).Split('|').Collect(&groups);
+ auto countersGroup = Group;
+ // FIXME: a little hack here: we have consumer - important - topic group names in proto
+ // that's why we iterate the group in reverse order
+ // this comes from: ydb/core/persqueue/user_info.h:310 (TUserInfo::TUserInfo)
+ std::reverse(groups.begin(), groups.end());
+ for (size_t i = 0; i < groups.size(); ++i) {
+ if (i != 1) {
+ countersGroup = countersGroup->GetSubgroup(groupNames[i], groups[i]);
+ }
+ }
+ const TString groupNamesStr = (groups.size() == 3) ? "client|important|topic" : "topic";
+
+ LabeledCountersByGroupReference[groupNamesStr]->FromProto(countersGroup,
+ proto.GetAggregatedPerTablets());
+ }
+}
+
+} // namespace NKikimr::NPrivate
diff --git a/ydb/core/tablet/private/labeled_db_counters.h b/ydb/core/tablet/private/labeled_db_counters.h
new file mode 100644
index 00000000000..13808848b0b
--- /dev/null
+++ b/ydb/core/tablet/private/labeled_db_counters.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include <ydb/core/sys_view/service/db_counters.h>
+#include <ydb/core/tablet/labeled_db_counters.h>
+#include <ydb/core/util/concurrent_rw_hash.h>
+
+#include "aggregated_counters.h"
+
+
+namespace NKikimr::NPrivate {
+
+class TPQCounters : public ILabeledCounters {
+protected:
+ TConcurrentRWHashMap<TString, TAutoPtr<TAggregatedLabeledCounters>, 256> LabeledCountersByGroup;
+ NMonitoring::TDynamicCounterPtr Group;
+
+public:
+ using TPtr = TIntrusivePtr<TPQCounters>;
+
+ explicit TPQCounters(NMonitoring::TDynamicCounterPtr counters);
+
+ void Apply(ui64 tabletID, const NKikimr::TTabletLabeledCountersBase* labeledCounters) override;
+ void ForgetTablet(ui64 tabletID) override;
+
+ static THashMap<TString, TAutoPtr<TAggregatedLabeledCounters>> LabeledCountersByGroupReference;
+};
+
+class TDbLabeledCounters : public TPQCounters, public NSysView::IDbCounters {
+public:
+ using TPtr = TIntrusivePtr<TDbLabeledCounters>;
+
+ TDbLabeledCounters();
+ explicit TDbLabeledCounters(::NMonitoring::TDynamicCounterPtr counters);
+
+ void ToProto(NKikimr::NSysView::TDbServiceCounters& counters) override;
+ void FromProto(NKikimr::NSysView::TDbServiceCounters& counters) override;
+};
+
+} // namespace NKikimr::NPrivate
diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp
index 0ebbe4aa0a4..111f530b837 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator.cpp
@@ -1,7 +1,9 @@
#include "tablet_counters_aggregator.h"
#include "tablet_counters_app.h"
#include "labeled_counters_merger.h"
+#include "labeled_db_counters.h"
#include "private/aggregated_counters.h"
+#include "private/labeled_db_counters.h"
#include <library/cpp/actors/core/log.h>
#include <ydb/core/mon/mon.h>
@@ -161,6 +163,12 @@ public:
}
}
+ void ApplyLabeledDbCounters(const TString& dbName, ui64 tabletId,
+ const TTabletLabeledCountersBase* labeledCounters, const TActorContext& ctx) {
+ auto iterDbLabeled = GetLabeledDbCounters(dbName, ctx);
+ iterDbLabeled->Apply(tabletId, labeledCounters);
+ }
+
void ForgetTablet(ui64 tabletId, TTabletTypes::EType tabletType, TPathId tenantPathId) {
AllTypes->Forget(tabletId);
// and now erase from every other path
@@ -172,9 +180,15 @@ public:
if (auto itPath = CountersByPathId.find(tenantPathId); itPath != CountersByPathId.end()) {
itPath->second->Forget(tabletId, tabletType);
}
- //and from all labeledCounters that could have this tablet
- auto iterTabletTypeAndGroup = LabeledCountersByTabletTypeAndGroup.lower_bound(std::make_pair(tabletType, TString()));
- for (; iterTabletTypeAndGroup != LabeledCountersByTabletTypeAndGroup.end() && iterTabletTypeAndGroup->first.first == tabletType; ) {
+
+ for (auto iter = LabeledDbCounters.begin(); iter != LabeledDbCounters.end(); ++iter) {
+ iter->second->ForgetTablet(tabletId);
+ }
+ // and from all labeledCounters that could have this tablet
+ auto iterTabletTypeAndGroup =
+ LabeledCountersByTabletTypeAndGroup.lower_bound(std::make_pair(tabletType, TString()));
+ for (; iterTabletTypeAndGroup != LabeledCountersByTabletTypeAndGroup.end() &&
+ iterTabletTypeAndGroup->first.first == tabletType; ) {
bool empty = iterTabletTypeAndGroup->second->ForgetTablet(tabletId);
if (empty) {
iterTabletTypeAndGroup = LabeledCountersByTabletTypeAndGroup.erase(iterTabletTypeAndGroup);
@@ -310,6 +324,10 @@ public:
CountersByPathId.erase(pathId);
}
+ void RemoveTabletsByDbPath(const TString& dbPath) {
+ LabeledDbCounters.erase(dbPath);
+ }
+
private:
// subgroups
class TTabletCountersForTabletType : public TThrRefBase {
@@ -1049,8 +1067,8 @@ public:
: ActorSystem(actorSystem)
{}
- void OnDatabaseRemoved(const TString&, TPathId pathId) override {
- auto evRemove = MakeHolder<TEvTabletCounters::TEvRemoveDatabase>(pathId);
+ void OnDatabaseRemoved(const TString& dbPath, TPathId pathId) override {
+ auto evRemove = MakeHolder<TEvTabletCounters::TEvRemoveDatabase>(dbPath, pathId);
auto aggregator = MakeTabletCountersAggregatorID(ActorSystem->NodeId, false);
ActorSystem->Send(aggregator, evRemove.Release());
}
@@ -1078,6 +1096,27 @@ private:
return dbCounters;
}
+ NPrivate::TDbLabeledCounters::TPtr GetLabeledDbCounters(const TString& dbName, const TActorContext& ctx) {
+ auto it = LabeledDbCounters.find(dbName);
+ if (it != LabeledDbCounters.end()) {
+ return it->second;
+ }
+
+ auto dbCounters = MakeIntrusive<NPrivate::TDbLabeledCounters>();
+ LabeledDbCounters[dbName] = dbCounters;
+
+ auto evRegister = MakeHolder<NSysView::TEvSysView::TEvRegisterDbCounters>(
+ NKikimrSysView::LABELED, dbName, dbCounters);
+ ctx.Send(NSysView::MakeSysViewServiceID(ctx.SelfID.NodeId()), evRegister.Release());
+
+ if (DbWatcherActorId) {
+ auto evWatch = MakeHolder<NSysView::TEvSysView::TEvWatchDatabase>(dbName);
+ ctx.Send(DbWatcherActorId, evWatch.Release());
+ }
+
+ return dbCounters;
+ }
+
private:
::NMonitoring::TDynamicCounterPtr Counters;
TTabletCountersForTabletTypePtr AllTypes;
@@ -1085,6 +1124,7 @@ private:
typedef THashMap<TPathId, TIntrusivePtr<TTabletCountersForDb>> TCountersByPathId;
typedef TMap<TTabletTypes::EType, THolder<TTabletCountersBase>> TAppCountersByTabletType;
+ typedef THashMap<TString, TIntrusivePtr<NPrivate::TDbLabeledCounters>> TLabeledCountersByDbPath;
typedef TMap<std::pair<TTabletTypes::EType, TString>, TAutoPtr<NPrivate::TAggregatedLabeledCounters>> TLabeledCountersByTabletTypeAndGroup;
typedef THashMap<ui64, std::pair<TAutoPtr<TTabletCountersBase>, TAutoPtr<TTabletCountersBase>>> TQuietTabletCounters;
@@ -1093,6 +1133,7 @@ private:
TActorId DbWatcherActorId;
TAppCountersByTabletType LimitedAppCounters; // without txs
TYdbTabletCountersPtr YdbCounters;
+ TLabeledCountersByDbPath LabeledDbCounters;
TLabeledCountersByTabletTypeAndGroup LabeledCountersByTabletTypeAndGroup;
TQuietTabletCounters QuietTabletCounters;
};
@@ -1202,8 +1243,14 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletAddLabele
if (msg->LabeledCounters.Get()->GetDatabasePath()) {
if (msg->TabletType == TTabletTypes::PersQueue) {
LOG_DEBUG_S(ctx, NKikimrServices::TABLET_AGGREGATOR,
- "got labeledCounters from db" << msg->LabeledCounters.Get()->GetDatabasePath());
+ "got labeledCounters from db: " << msg->LabeledCounters.Get()->GetDatabasePath() <<
+ "; tablet: " << msg->TabletID);
+ TabletMon->ApplyLabeledDbCounters(msg->LabeledCounters.Get()->GetDatabasePath().GetRef(), msg->TabletID, msg->LabeledCounters.Get(), ctx);
} else {
+ LOG_ERROR_S(ctx, NKikimrServices::TABLET_AGGREGATOR,
+ "got labeledCounters from unknown Tablet Type: " << msg->TabletType <<
+ "; db: " << msg->LabeledCounters.Get()->GetDatabasePath() <<
+ "; tablet: " << msg->TabletID);
return;
}
} else {
@@ -1353,8 +1400,8 @@ TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvTabletLabeledCo
////////////////////////////////////////////
void TTabletCountersAggregatorActor::HandleWork(TEvTabletCounters::TEvRemoveDatabase::TPtr& ev) {
-
TabletMon->RemoveTabletsByPathId(ev->Get()->PathId);
+ TabletMon->RemoveTabletsByDbPath(ev->Get()->DbPath);
}
////////////////////////////////////////////
diff --git a/ydb/core/tablet/tablet_counters_aggregator.h b/ydb/core/tablet/tablet_counters_aggregator.h
index 1c39b3295e8..be9e336d85c 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.h
+++ b/ydb/core/tablet/tablet_counters_aggregator.h
@@ -105,10 +105,12 @@ struct TEvTabletCounters {
};
struct TEvRemoveDatabase : public TEventLocal<TEvRemoveDatabase, EvRemoveDatabase> {
+ const TString DbPath;
const TPathId PathId;
- explicit TEvRemoveDatabase(TPathId pathId)
- : PathId(pathId)
+ TEvRemoveDatabase(const TString& dbPath, TPathId pathId)
+ : DbPath(dbPath)
+ , PathId(pathId)
{}
};
diff --git a/ydb/core/tablet/tablet_counters_aggregator_ut.cpp b/ydb/core/tablet/tablet_counters_aggregator_ut.cpp
index 013d292c243..f94a11f898f 100644
--- a/ydb/core/tablet/tablet_counters_aggregator_ut.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator_ut.cpp
@@ -1,5 +1,7 @@
#include "tablet_counters_aggregator.h"
+#include "private/labeled_db_counters.h"
+#include <ydb/core/base/counters.h>
#include <ydb/core/testlib/basics/runtime.h>
#include <ydb/core/testlib/basics/appdata.h>
@@ -824,6 +826,101 @@ Y_UNIT_TEST_SUITE(TTabletLabeledCountersAggregator) {
UNIT_ASSERT_VALUES_EQUAL(res[1], "cons/aaa|1|aba/caba/daba|man");
}
+ Y_UNIT_TEST(DbAggregation) {
+ TVector<TActorId> cc;
+ TActorId aggregatorId;
+
+ TTestBasicRuntime runtime(1);
+
+ runtime.Initialize(TAppPrepare().Unwrap());
+ runtime.GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
+
+ TActorId edge = runtime.AllocateEdgeActor();
+
+ runtime.SetRegistrationObserverFunc([&cc, &aggregatorId]
+ (TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ TTestActorRuntime::DefaultRegistrationObserver(runtime, parentId, actorId);
+ if (parentId == aggregatorId) {
+ cc.push_back(actorId);
+ }
+ });
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ runtime.DispatchEvents(options);
+ for (const auto& a : cc) {
+ THolder<TEvInterconnect::TEvNodesInfo> nodesInfo = MakeHolder<TEvInterconnect::TEvNodesInfo>();
+ nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(1, "::", "localhost", "localhost", 1234, TNodeLocation()));
+ nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(2, "::", "localhost", "localhost", 1234, TNodeLocation()));
+ nodesInfo->Nodes.emplace_back(TEvInterconnect::TNodeInfo(3, "::", "localhost", "localhost", 1234, TNodeLocation()));
+ runtime.Send(new NActors::IEventHandle(a, edge, nodesInfo.Release()), 0, true);
+ }
+
+ NPrivate::TDbLabeledCounters PQCounters;
+
+ const size_t namesN{5};
+ std::array<const char *, namesN> names;
+ names.fill("");
+ names[0] = "whatever";
+ names[1] = "whenever";
+ std::array<const char *, namesN> groupNames;
+ groupNames.fill("topic");
+ groupNames[1] = "user||topic";
+ std::array<ui8, namesN> types;
+ types.fill(static_cast<ui8>(TLabeledCounterOptions::CT_SIMPLE));
+
+ std::array<ui8, namesN> functions;
+ functions.fill(static_cast<ui8>(TLabeledCounterOptions::EAF_SUM));
+ functions[1] = static_cast<ui8>(TLabeledCounterOptions::EAF_MAX);
+
+ {
+ NKikimr::TTabletLabeledCountersBase labeledCounters(namesN, &names[0], &types[0], &functions[0],
+ "some_stream", &groupNames[0], 1, "/Root/PQ1");
+ labeledCounters.GetCounters()[0].Set(10);
+ labeledCounters.GetCounters()[1].Set(10);
+ PQCounters.Apply(0, &labeledCounters);
+ labeledCounters.GetCounters()[0].Set(11);
+ labeledCounters.GetCounters()[1].Set(100);
+ PQCounters.Apply(1, &labeledCounters);
+ labeledCounters.GetCounters()[0].Set(12);
+ labeledCounters.GetCounters()[1].Set(10);
+ PQCounters.Apply(2, &labeledCounters);
+ // SUM 33
+ // MAX 100
+ }
+
+ {
+ NKikimr::TTabletLabeledCountersBase labeledCounters(namesN, &names[0], &types[0], &functions[0],
+ "some_stream", &groupNames[0], 1, "/Root/PQ2");
+ labeledCounters.GetCounters()[0].Set(20);
+ labeledCounters.GetCounters()[1].Set(1);
+ PQCounters.Apply(0, &labeledCounters);
+ labeledCounters.GetCounters()[0].Set(21);
+ labeledCounters.GetCounters()[1].Set(11);
+ PQCounters.Apply(1, &labeledCounters);
+ labeledCounters.GetCounters()[0].Set(22);
+ labeledCounters.GetCounters()[1].Set(10);
+ PQCounters.Apply(2, &labeledCounters);
+ // SUM 63
+ // MAX 11
+ }
+
+ NKikimr::NSysView::TDbServiceCounters counters;
+
+ // Here we check that consequent calls do not interfere
+ for (int i = 10; i >= 0; --i) {
+ PQCounters.ToProto(counters);
+
+ auto pqCounters = counters.FindOrAddLabeledCounters("some_stream");
+ UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().group(), "some_stream");
+ UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().delimiter(), "|");
+ UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter(0).value(), 63);
+ UNIT_ASSERT_VALUES_EQUAL(pqCounters->GetAggregatedPerTablets().GetLabeledCounter(1).value(), 11);
+
+ PQCounters.FromProto(counters);
+ }
+ }
}
}
diff --git a/ydb/core/tablet/tablet_counters_protobuf.h b/ydb/core/tablet/tablet_counters_protobuf.h
index cc296d9ae48..ea707420e26 100644
--- a/ydb/core/tablet/tablet_counters_protobuf.h
+++ b/ydb/core/tablet/tablet_counters_protobuf.h
@@ -653,7 +653,7 @@ public:
SimpleOpts()->GetAggregateFuncs(), group, SimpleOpts()->GetGroupNames(), id, databasePath)
{
TVector<TString> groups;
- StringSplitter(group).Split('|').SkipEmpty().Collect(&groups);
+ StringSplitter(group).Split('|').Collect(&groups);
Y_VERIFY(SimpleOpts()->GetGroupNamesSize() == groups.size());
}
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index 1786eaeceb6..a940ed2083a 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -197,5 +197,4 @@ namespace NKikimr {
{
FeatureFlags.SetEnableDbCounters(value);
}
-
}
diff --git a/ydb/core/testlib/tablet_helpers.cpp b/ydb/core/testlib/tablet_helpers.cpp
index f671b629a3f..41d4f3a391f 100644
--- a/ydb/core/testlib/tablet_helpers.cpp
+++ b/ydb/core/testlib/tablet_helpers.cpp
@@ -1210,6 +1210,8 @@ namespace NKikimr {
bootstrapperActorId = Boot(ctx, type, &NSequenceShard::CreateSequenceShard, DataGroupErasure);
} else if (type == TTabletTypes::ReplicationController) {
bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure);
+ } else if (type == TTabletTypes::PersQueue) {
+ bootstrapperActorId = Boot(ctx, type, &CreatePersQueue, DataGroupErasure);
} else {
status = NKikimrProto::ERROR;
}
diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp
index 292c66d3da5..78d5d2ee455 100644
--- a/ydb/core/testlib/tenant_runtime.cpp
+++ b/ydb/core/testlib/tenant_runtime.cpp
@@ -23,6 +23,7 @@
#include <ydb/core/tx/tx_allocator/txallocator.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/sys_view/processor/processor.h>
+#include <ydb/core/persqueue/pq.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/interconnect/interconnect.h>
@@ -432,6 +433,8 @@ class TFakeHive : public TActor<TFakeHive>, public TTabletExecutedFlat {
bootstrapperActorId = Boot(ctx, type, &NSequenceShard::CreateSequenceShard, DataGroupErasure);
} else if (type == TTabletTypes::ReplicationController) {
bootstrapperActorId = Boot(ctx, type, &NReplication::CreateController, DataGroupErasure);
+ } else if (type == TTabletTypes::PersQueue) {
+ bootstrapperActorId = Boot(ctx, type, &NKikimr::CreatePersQueue, DataGroupErasure);
} else {
status = NKikimrProto::ERROR;
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index e29737a1921..c160e2f2b21 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -110,8 +110,6 @@ namespace NKikimr {
namespace Tests {
-
-
TServerSettings& TServerSettings::SetDomainName(const TString& value) {
StoragePoolTypes.erase("test");
DomainName = value;
@@ -425,6 +423,56 @@ namespace Tests {
app.AddDomain(domain.Release());
}
+ TVector<ui64> TServer::StartPQTablets(ui32 pqTabletsN) {
+ auto getChannelBind = [](const TString& storagePool) {
+ TChannelBind bind;
+ bind.SetStoragePoolName(storagePool);
+ return bind;
+ };
+ TVector<ui64> ids;
+ ids.reserve(pqTabletsN);
+ for (ui32 i = 0; i < pqTabletsN; ++i) {
+ auto tabletId = Tests::ChangeStateStorage(Tests::DummyTablet2 + i + 1, Settings->Domain);
+ TIntrusivePtr<TTabletStorageInfo> tabletInfo =
+ CreateTestTabletInfo(tabletId, TTabletTypes::PersQueue);
+ TIntrusivePtr<TTabletSetupInfo> setupInfo =
+ new TTabletSetupInfo(&CreatePersQueue, TMailboxType::Simple, 0, TMailboxType::Simple, 0);
+
+ static TString STORAGE_POOL = "/Root:test";
+ static TChannelsBindings BINDED_CHANNELS =
+ {getChannelBind(STORAGE_POOL), getChannelBind(STORAGE_POOL), getChannelBind(STORAGE_POOL)};
+
+ ui32 nodeIndex = 0;
+ auto ev =
+ MakeHolder<TEvHive::TEvCreateTablet>(tabletId, 0, TTabletTypes::PersQueue, BINDED_CHANNELS);
+
+ TActorId senderB = Runtime->AllocateEdgeActor(nodeIndex);
+ ui64 hive = ChangeStateStorage(Tests::Hive, Settings->Domain);
+ Runtime->SendToPipe(hive, senderB, ev.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ auto createTabletReply = Runtime->GrabEdgeEventRethrow<TEvHive::TEvCreateTabletReply>(handle);
+ UNIT_ASSERT(createTabletReply);
+ auto expectedStatus = NKikimrProto::OK;
+ UNIT_ASSERT_EQUAL_C(createTabletReply->Record.GetStatus(), expectedStatus,
+ (ui32)createTabletReply->Record.GetStatus() << " != " << (ui32)expectedStatus);
+ UNIT_ASSERT_EQUAL_C(createTabletReply->Record.GetOwner(), tabletId,
+ createTabletReply->Record.GetOwner() << " != " << tabletId);
+ ui64 id = createTabletReply->Record.GetTabletID();
+ while (true) {
+ auto tabletCreationResult =
+ Runtime->GrabEdgeEventRethrow<TEvHive::TEvTabletCreationResult>(handle);
+ UNIT_ASSERT(tabletCreationResult);
+ if (id == tabletCreationResult->Record.GetTabletID()) {
+ UNIT_ASSERT_EQUAL_C(tabletCreationResult->Record.GetStatus(), NKikimrProto::OK,
+ (ui32)tabletCreationResult->Record.GetStatus() << " != " << (ui32)NKikimrProto::OK);
+ break;
+ }
+ }
+ ids.push_back(id);
+ }
+ return ids;
+ }
+
void TServer::CreateBootstrapTablets() {
const ui32 domainId = Settings->Domain;
Y_VERIFY(TDomainsInfo::MakeTxAllocatorIDFixed(domainId, 1) == ChangeStateStorage(TxAllocator, domainId));
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index c7ce2bbb7c2..4bf212ca1db 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -164,6 +164,7 @@ namespace Tests {
TServerSettings& SetFeatureFlags(const NKikimrConfig::TFeatureFlags& value) { FeatureFlags = value; return *this; }
TServerSettings& SetCompactionConfig(const NKikimrConfig::TCompactionConfig& value) { CompactionConfig = value; return *this; }
TServerSettings& SetEnableDbCounters(bool value) { FeatureFlags.SetEnableDbCounters(value); return *this; }
+ TServerSettings& SetEnablePersistentQueryStats(bool value) { FeatureFlags.SetEnablePersistentQueryStats(value); return *this; }
TServerSettings& SetEnableYq(bool value) { EnableYq = value; return *this; }
TServerSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; }
TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; }
@@ -257,6 +258,7 @@ namespace Tests {
}
}
void StartDummyTablets();
+ TVector<ui64> StartPQTablets(ui32 pqTabletsN);
TTestActorRuntime* GetRuntime() const;
const TServerSettings& GetSettings() const;
const NScheme::TTypeRegistry* GetTypeRegistry();