diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-15 16:30:12 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-15 16:30:12 +0300 |
commit | bd1aa575a8eff44e5f6b7d58d7d4dedb931c6112 (patch) | |
tree | c2d211bcf886354bab3017fef7a6492c942b2943 | |
parent | 85fddb28e49cbc19fa01983cc3663a3b7d49c0cb (diff) | |
download | ydb-bd1aa575a8eff44e5f6b7d58d7d4dedb931c6112.tar.gz |
Персистить SubDomainId в ReadBalancer-е
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 36 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.h | 10 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_stats.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_subdomain.cpp | 8 |
6 files changed, 63 insertions, 4 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index e521e34232..f692d87b4e 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -49,6 +49,11 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0); Self->NextPartitionId = dataRowset.GetValueOrDefault<Schema::Data::NextPartitionId>(0); + ui64 subDomainPathId = dataRowset.GetValueOrDefault<Schema::Data::SubDomainPathId>(0); + if (subDomainPathId) { + Self->SubDomainPathId.emplace(Self->SchemeShardId, subDomainPathId); + } + TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>(""); if (!config.empty()) { bool res = Self->TabletConfig.ParseFromString(config); @@ -139,7 +144,8 @@ bool TPersQueueReadBalancer::TTxWrite::Execute(TTransactionContext& txc, const T NIceDb::TUpdate<Schema::Data::MaxPartsPerTablet>(Self->MaxPartsPerTablet), NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId), NIceDb::TUpdate<Schema::Data::NextPartitionId>(Self->NextPartitionId), - NIceDb::TUpdate<Schema::Data::Config>(config)); + NIceDb::TUpdate<Schema::Data::Config>(config), + NIceDb::TUpdate<Schema::Data::SubDomainPathId>(Self->SubDomainPathId ? Self->SubDomainPathId->LocalPathId : 0)); for (auto& p : DeletedPartitions) { db.Table<Schema::Partitions>().Key(p).Delete(); } @@ -519,6 +525,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr ui32 prevNextPartitionId = NextPartitionId; NextPartitionId = record.HasNextPartitionId() ? record.GetNextPartitionId() : 0; THashMap<ui32, TPartitionInfo> partitionsInfo; + if (record.HasSubDomainPathId()) { + SubDomainPathId.emplace(record.GetSchemeShardId(), record.GetSubDomainPathId()); + } Consumers.clear(); for (const auto& rr : TabletConfig.GetReadRules()) { @@ -605,6 +614,10 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr RebuildStructs(); Execute(new TTxWrite(this, std::move(deletedPartitions), std::move(newPartitions), std::move(newTablets), std::move(newGroups), std::move(reallocatedTablets)), ctx); + + if (WatchingSubDomainPathId && SubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) { + StartWatchingSubDomainPathId(); + } } @@ -832,6 +845,7 @@ TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() { rec.SetRound(++StatsReportRound); rec.SetDataSize(AggregatedStats.TotalDataSize); rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize); + rec.SetSubDomainOutOfSpace(SubDomainOutOfSpace); return ev; } @@ -1342,6 +1356,25 @@ void TPersQueueReadBalancer::StopFindSubDomainPathId() { } } + +struct TTxWriteSubDomainPathId : public ITransaction { + TPersQueueReadBalancer* const Self; + + TTxWriteSubDomainPathId(TPersQueueReadBalancer* self) + : Self(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) { + NIceDb::TNiceDb db(txc.DB); + db.Table<TPersQueueReadBalancer::Schema::Data>().Key(1).Update( + NIceDb::TUpdate<TPersQueueReadBalancer::Schema::Data::SubDomainPathId>(Self->SubDomainPathId->LocalPathId)); + return true; + } + + void Complete(const TActorContext&) { + } +}; + void TPersQueueReadBalancer::StartFindSubDomainPathId(bool delayFirstRequest) { if (!FindSubDomainPathIdActor && SchemeShardId != 0 && @@ -1365,6 +1398,7 @@ void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPa "Discovered subdomain " << msg->LocalPathId << " at RB " << TabletID()); SubDomainPathId.emplace(msg->SchemeShardId, msg->LocalPathId); + Execute(new TTxWriteSubDomainPathId(this), ctx); StartWatchingSubDomainPathId(); } } diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index fca831618c..c0728debbf 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -62,9 +62,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa struct MaxPartsPerTablet : Column<41, NScheme::NTypeIds::Uint32> {}; struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {}; struct NextPartitionId : Column<43, NScheme::NTypeIds::Uint64> {}; + struct SubDomainPathId : Column<44, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Key>; - using TColumns = TableColumns<Key, PathId, Topic, Path, Version, Config, MaxPartsPerTablet, SchemeShardId, NextPartitionId>; + using TColumns = TableColumns<Key, PathId, Topic, Path, Version, Config, MaxPartsPerTablet, SchemeShardId, NextPartitionId, SubDomainPathId>; }; struct Partitions : Table<33> { @@ -227,7 +228,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa } void InitDone(const TActorContext &ctx) { - StartFindSubDomainPathId(false); + if (SubDomainPathId) { + StartWatchingSubDomainPathId(); + } else { + StartFindSubDomainPathId(true); + } StartPartitionIdForWrite = NextPartitionIdForWrite = rand() % TotalGroups; @@ -498,6 +503,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa std::optional<TPathId> SubDomainPathId; std::optional<TPathId> WatchingSubDomainPathId; + friend struct TTxWriteSubDomainPathId; bool SubDomainOutOfSpace = false; public: diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 45c5319e05..45a3e05ef9 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -390,6 +390,7 @@ message TUpdateBalancerConfig { //for schemeshard use only } repeated TTablet Tablets = 10; + optional uint64 SubDomainPathId = 13; } message TDescribe { @@ -851,6 +852,8 @@ message TEvPeriodicTopicStats { required uint64 DataSize = 4; required uint64 UsedReserveSize = 5; + + optional bool SubDomainOutOfSpace = 6; }; message TEvSubDomainStatus { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 3eb7ed8a4c..df3d99dccf 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -771,6 +771,10 @@ public: } } + if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) { + event->Record.SetSubDomainPathId(subDomainPathId); + } + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose configure PersQueueReadBalancer" << ", opId: " << OperationId diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp index 2139c6a3cb..7967f72c54 100644 --- a/ydb/core/tx/schemeshard/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats.cpp @@ -500,6 +500,10 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { ui64 balancerId = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID(); + auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + UNIT_ASSERT_EQUAL_C(0, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); + UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer"); + ui32 msgSeqNo = 100; WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100"); @@ -507,7 +511,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change - auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer"); UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer"); diff --git a/ydb/core/tx/schemeshard/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain.cpp index 417171e827..ec752a4d28 100644 --- a/ydb/core/tx/schemeshard/ut_subdomain.cpp +++ b/ydb/core/tx/schemeshard/ut_subdomain.cpp @@ -3172,6 +3172,11 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"), {LsCheckDiskQuotaExceeded(false, "Topic was created")}); + + ui64 balancerId = DescribePath(runtime, "/MyRoot/USER_1/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID(); + + auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + UNIT_ASSERT_EQUAL_C(false, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer"); ui32 seqNo = 100; WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, "Message 0"); @@ -3180,6 +3185,9 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) { TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"), {LsCheckDiskQuotaExceeded(true, "Message 0 was written")}); + stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId); + UNIT_ASSERT_EQUAL_C(true, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer after write"); + TestDropPQGroup(runtime, ++txId, "/MyRoot/USER_1", "Topic1"); env.TestWaitNotification(runtime, txId); env.SimulateSleep(runtime, TDuration::Seconds(1)); |