diff options
author | FloatingCrowbar <komels@ydb.tech> | 2024-12-18 14:04:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-18 11:04:31 +0000 |
commit | dda6b209fc1a89c124607e47564e594cf6347b9d (patch) | |
tree | 35706c0cfb7e903a61d32c7e1b8f4b2c2a4cb598 | |
parent | 722f9c51cd7a2a9a7cba7f8a7d8832630e6ce635 (diff) | |
download | ydb-dda6b209fc1a89c124607e47564e594cf6347b9d.tar.gz |
PQRB test for counters (#12711)
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 103 |
3 files changed, 115 insertions, 5 deletions
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index 02cafb39ec2..460431b1094 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -190,12 +190,14 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, } void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, - TTestContext& context, const bool requireAuth, bool kill) { - PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill); + TTestContext& context, const bool requireAuth, bool kill, const THashSet<TString>& xtraConsumers) { + PQBalancerPrepare(topic, map, ssId, *context.Runtime, context.BalancerTabletId, context.Edge, requireAuth, kill, + xtraConsumers); } void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::pair<ui64, ui32>>>& map, const ui64 ssId, - TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill) { + TTestActorRuntime& runtime, ui64 balancerTabletId, TActorId edge, const bool requireAuth, bool kill, + const THashSet<TString>& xtraConsumers) { TAutoPtr<IEventHandle> handle; static int version = 0; ++version; @@ -227,6 +229,9 @@ void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::p request->Record.SetPath("/Root/" + topic); request->Record.SetSchemeShardId(ssId); request->Record.MutableTabletConfig()->AddReadRules("client"); + for (const auto& c : xtraConsumers) { + request->Record.MutableTabletConfig()->AddReadRules(c); + }; request->Record.MutableTabletConfig()->SetRequireAuthWrite(requireAuth); request->Record.MutableTabletConfig()->SetRequireAuthRead(requireAuth); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 7bd5667ecab..5430facd8bf 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -277,7 +277,8 @@ void PQBalancerPrepare( ui64 tabletId, TActorId edge, const bool requireAuth = false, - bool kill = true); + bool kill = true, + const THashSet<TString>& xtraConsumers = {}); void PQTabletRestart( TTestActorRuntime& runtime, @@ -300,7 +301,8 @@ void PQBalancerPrepare( const ui64 ssId, TTestContext& context, const bool requireAuth = false, - bool kill = true); + bool kill = true, + const THashSet<TString>& xtraConsumers = {}); void PQTabletRestart(TTestContext& context); diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index a8f20cb2056..f4706a5b671 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -559,6 +559,109 @@ Y_UNIT_TEST(ImportantFlagSwitching) { }); } +Y_UNIT_TEST(NewConsumersCountersAppear) { + TTestContext tc; + tc.InitialEventsFilter.Prepare(); + + TFinalizer finalizer(tc); + bool activeZone = false; + bool dbRegistered{false}; + bool labeledCountersReceived =false ; + + tc.Prepare("", [](TTestActorRuntime&) {}, activeZone, true, true, true); + + tc.Runtime->SetScheduledLimit(5000); + + tc.Runtime->SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { + if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) { + auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database; + UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ"); + dbRegistered = true; + } else if (event->GetTypeRewrite() == TEvTabletCounters::EvTabletAddLabeledCounters) { + labeledCountersReceived = true; + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + PQTabletPrepare({.deleteTime=3600, .writeSpeed = 100_KB, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc); + TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()}; + ui64 ssId = 325; + BootFakeSchemeShard(*tc.Runtime, ssId, state); + + PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user1", "user2"}); + + IActor* actor = CreateTabletCountersAggregator(false); + auto aggregatorId = tc.Runtime->Register(actor); + tc.Runtime->EnableScheduleForActor(aggregatorId); + + CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true); + CmdWrite(0, "sourceid1", TestData(), tc, false); + CmdWrite(0, "sourceid2", TestData(), tc, false); + PQGetPartInfo(0, 30, tc); + + { + NSchemeCache::TDescribeResult::TPtr result = new NSchemeCache::TDescribeResult{}; + result->SetPath("/Root"); + TVector<TString> attrs = {"folder_id", "cloud_id", "database_id"}; + for (auto& attr : attrs) { + auto ua = result->MutablePathDescription()->AddUserAttributes(); + ua->SetKey(attr); + ua->SetValue(attr); + } + NSchemeCache::TDescribeResult::TCPtr cres = result; + auto event = MakeHolder<TEvTxProxySchemeCache::TEvWatchNotifyUpdated>(0, "/Root", TPathId{}, cres); + TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries()); + tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, event.Release(), 0, GetPipeConfigWithRetries(), pipeClient); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvTxProxySchemeCache::EvWatchNotifyUpdated); + auto processedCountersEvent = tc.Runtime->DispatchEvents(options); + UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true); + } + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats); + auto processedCountersEvent = tc.Runtime->DispatchEvents(options); + UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true); + } + + { + auto counters = tc.Runtime->GetAppData(0).Counters; + auto dbGroup = GetServiceCounters(counters, "topics_serverless", false); + + auto group = dbGroup->GetSubgroup("host", "") + ->GetSubgroup("database", "/Root") + ->GetSubgroup("cloud_id", "cloud_id") + ->GetSubgroup("folder_id", "folder_id") + ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic"); + for (const auto& user : {"client", "user1", "user2"}) { + auto consumerSG = group->FindSubgroup("consumer", user); + UNIT_ASSERT_C(consumerSG, user); + } + } + PQBalancerPrepare("topic", {{0, {tc.TabletId, 1}}}, ssId, tc, false, false, {"user3", "user2"}); + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats); + auto processedCountersEvent = tc.Runtime->DispatchEvents(options); + UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true); + } + + { + auto counters = tc.Runtime->GetAppData(0).Counters; + auto dbGroup = GetServiceCounters(counters, "topics_serverless", false); + + auto group = dbGroup->GetSubgroup("host", "") + ->GetSubgroup("database", "/Root") + ->GetSubgroup("cloud_id", "cloud_id") + ->GetSubgroup("folder_id", "folder_id") + ->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic"); + for (const auto& user : {"user2", "user3"}) { + auto consumerSG = group->FindSubgroup("consumer", user); + UNIT_ASSERT_C(consumerSG, user); + } + } +} + } // Y_UNIT_TEST_SUITE(PQCountersLabeled) Y_UNIT_TEST_SUITE(TMultiBucketCounter) { |