aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFloatingCrowbar <komels@ydb.tech>2024-12-18 14:04:31 +0300
committerGitHub <noreply@github.com>2024-12-18 11:04:31 +0000
commitdda6b209fc1a89c124607e47564e594cf6347b9d (patch)
tree35706c0cfb7e903a61d32c7e1b8f4b2c2a4cb598
parent722f9c51cd7a2a9a7cba7f8a7d8832630e6ce635 (diff)
downloadydb-dda6b209fc1a89c124607e47564e594cf6347b9d.tar.gz
PQRB test for counters (#12711)
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp11
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h6
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp103
3 files changed, 115 insertions, 5 deletions
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index 02cafb39ec2..460431b1094 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -190,12 +190,14 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset,
}
void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId,
- TTestContext& context, const bool requireAuth, bool kill) {
- PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill);
+ TTestContext& context, const bool requireAuth, bool kill, const THashSet<TString>& xtraConsumers) {
+ PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill,
+ xtraConsumers);
}
void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId,
- TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill) {
+ TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill,
+ const THashSet<TString>& xtraConsumers) {
TAutoPtr<IEventHandle> handle;
static int version = 0;
++version;
@@ -227,6 +229,9 @@ void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::p
request->Record.SetPath("/Root/" + topic);
request->Record.SetSchemeShardId(ssId);
request->Record.MutableTabletConfig()->AddReadRules("client");
+ for (const auto& c : xtraConsumers) {
+ request->Record.MutableTabletConfig()->AddReadRules(c);
+ };
request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth);
request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth);
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 7bd5667ecab..5430facd8bf 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -277,7 +277,8 @@ void PQBalancerPrepare(
ui64 tabletId,
TActorId edge,
const bool requireAuth = false,
- bool kill = true);
+ bool kill = true,
+ const THashSet<TString>& xtraConsumers = {});
void PQTabletRestart(
TTestActorRuntime& runtime,
@@ -300,7 +301,8 @@ void PQBalancerPrepare(
const ui64 ssId,
TTestContext& context,
const bool requireAuth = false,
- bool kill = true);
+ bool kill = true,
+ const THashSet<TString>& xtraConsumers = {});
void PQTabletRestart(TTestContext& context);
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index a8f20cb2056..f4706a5b671 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -559,6 +559,109 @@ Y_UNIT_TEST(ImportantFlagSwitching) {
});
}
+Y_UNIT_TEST(NewConsumersCountersAppear) {
+ TTestContext tc;
+ tc.InitialEventsFilter.Prepare();
+
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ bool dbRegistered{false};
+ bool labeledCountersReceived =false ;
+
+ tc.Prepare("", [](TTestActorRuntime&) {}, activeZone, true, true, true);
+
+ tc.Runtime->SetScheduledLimit(5000);
+
+ tc.Runtime->SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
+ if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) {
+ auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database;
+ UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ");
+ dbRegistered = true;
+ } else if (event->GetTypeRewrite() == TEvTabletCounters::EvTabletAddLabeledCounters) {
+ labeledCountersReceived = true;
+ }
+ return TTestActorRuntime::DefaultObserverFunc(event);
+ });
+ PQTabletPrepare({.deleteTime=3600, .writeSpeed = 100_KB, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc);
+ TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
+ ui64 ssId = 325;
+ BootFakeSchemeShard(*tc.Runtime, ssId, state);
+
+ PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user1", "user2"});
+
+ IActor* actor = CreateTabletCountersAggregator(false);
+ auto aggregatorId = tc.Runtime->Register(actor);
+ tc.Runtime->EnableScheduleForActor(aggregatorId);
+
+ CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
+ CmdWrite(0, "sourceid1", TestData(), tc, false);
+ CmdWrite(0, "sourceid2", TestData(), tc, false);
+ PQGetPartInfo(0, 30, tc);
+
+ {
+ NSchemeCache::TDescribeResult::TPtr result = new NSchemeCache::TDescribeResult{};
+ result->SetPath("/Root");
+ TVector<TString> attrs = {"folder_id", "cloud_id", "database_id"};
+ for (auto& attr : attrs) {
+ auto ua = result->MutablePathDescription()->AddUserAttributes();
+ ua->SetKey(attr);
+ ua->SetValue(attr);
+ }
+ NSchemeCache::TDescribeResult::TCPtr cres = result;
+ auto event = MakeHolder<TEvTxProxySchemeCache::TEvWatchNotifyUpdated>(0, "/Root", TPathId{}, cres);
+ TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries());
+ tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, event.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvTxProxySchemeCache::EvWatchNotifyUpdated);
+ auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
+ UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
+ }
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats);
+ auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
+ UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
+ }
+
+ {
+ auto counters = tc.Runtime->GetAppData(0).Counters;
+ auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
+
+ auto group = dbGroup->GetSubgroup("host", "")
+ ->GetSubgroup("database", "/Root")
+ ->GetSubgroup("cloud_id", "cloud_id")
+ ->GetSubgroup("folder_id", "folder_id")
+ ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
+ for (const auto& user : {"client", "user1", "user2"}) {
+ auto consumerSG = group->FindSubgroup("consumer", user);
+ UNIT_ASSERT_C(consumerSG, user);
+ }
+ }
+ PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user3", "user2"});
+ {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats);
+ auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
+ UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
+ }
+
+ {
+ auto counters = tc.Runtime->GetAppData(0).Counters;
+ auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
+
+ auto group = dbGroup->GetSubgroup("host", "")
+ ->GetSubgroup("database", "/Root")
+ ->GetSubgroup("cloud_id", "cloud_id")
+ ->GetSubgroup("folder_id", "folder_id")
+ ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
+ for (const auto& user : {"user2", "user3"}) {
+ auto consumerSG = group->FindSubgroup("consumer", user);
+ UNIT_ASSERT_C(consumerSG, user);
+ }
+ }
+}
+
} // Y_UNIT_TEST_SUITE(PQCountersLabeled)
Y_UNIT_TEST_SUITE(TMultiBucketCounter) {