aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-15 16:30:12 +0300
committertesseract <tesseract@yandex-team.com>2023-03-15 16:30:12 +0300
commitbd1aa575a8eff44e5f6b7d58d7d4dedb931c6112 (patch)
treec2d211bcf886354bab3017fef7a6492c942b2943
parent85fddb28e49cbc19fa01983cc3663a3b7d49c0cb (diff)
downloadydb-bd1aa575a8eff44e5f6b7d58d7d4dedb931c6112.tar.gz
Персистить SubDomainId в ReadBalancer-е
-rw-r--r--ydb/core/persqueue/read_balancer.cpp36
-rw-r--r--ydb/core/persqueue/read_balancer.h10
-rw-r--r--ydb/core/protos/pqconfig.proto3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_stats.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_subdomain.cpp8
6 files changed, 63 insertions, 4 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index e521e34232..f692d87b4e 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -49,6 +49,11 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
Self->SchemeShardId = dataRowset.GetValueOrDefault<Schema::Data::SchemeShardId>(0);
Self->NextPartitionId = dataRowset.GetValueOrDefault<Schema::Data::NextPartitionId>(0);
+ ui64 subDomainPathId = dataRowset.GetValueOrDefault<Schema::Data::SubDomainPathId>(0);
+ if (subDomainPathId) {
+ Self->SubDomainPathId.emplace(Self->SchemeShardId, subDomainPathId);
+ }
+
TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>("");
if (!config.empty()) {
bool res = Self->TabletConfig.ParseFromString(config);
@@ -139,7 +144,8 @@ bool TPersQueueReadBalancer::TTxWrite::Execute(TTransactionContext& txc, const T
NIceDb::TUpdate<Schema::Data::MaxPartsPerTablet>(Self->MaxPartsPerTablet),
NIceDb::TUpdate<Schema::Data::SchemeShardId>(Self->SchemeShardId),
NIceDb::TUpdate<Schema::Data::NextPartitionId>(Self->NextPartitionId),
- NIceDb::TUpdate<Schema::Data::Config>(config));
+ NIceDb::TUpdate<Schema::Data::Config>(config),
+ NIceDb::TUpdate<Schema::Data::SubDomainPathId>(Self->SubDomainPathId ? Self->SubDomainPathId->LocalPathId : 0));
for (auto& p : DeletedPartitions) {
db.Table<Schema::Partitions>().Key(p).Delete();
}
@@ -519,6 +525,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
ui32 prevNextPartitionId = NextPartitionId;
NextPartitionId = record.HasNextPartitionId() ? record.GetNextPartitionId() : 0;
THashMap<ui32, TPartitionInfo> partitionsInfo;
+ if (record.HasSubDomainPathId()) {
+ SubDomainPathId.emplace(record.GetSchemeShardId(), record.GetSubDomainPathId());
+ }
Consumers.clear();
for (const auto& rr : TabletConfig.GetReadRules()) {
@@ -605,6 +614,10 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
RebuildStructs();
Execute(new TTxWrite(this, std::move(deletedPartitions), std::move(newPartitions), std::move(newTablets), std::move(newGroups), std::move(reallocatedTablets)), ctx);
+
+ if (WatchingSubDomainPathId && SubDomainPathId && *WatchingSubDomainPathId != *SubDomainPathId) {
+ StartWatchingSubDomainPathId();
+ }
}
@@ -832,6 +845,7 @@ TEvPersQueue::TEvPeriodicTopicStats* TPersQueueReadBalancer::GetStatsEvent() {
rec.SetRound(++StatsReportRound);
rec.SetDataSize(AggregatedStats.TotalDataSize);
rec.SetUsedReserveSize(AggregatedStats.TotalUsedReserveSize);
+ rec.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
return ev;
}
@@ -1342,6 +1356,25 @@ void TPersQueueReadBalancer::StopFindSubDomainPathId() {
}
}
+
+struct TTxWriteSubDomainPathId : public ITransaction {
+ TPersQueueReadBalancer* const Self;
+
+ TTxWriteSubDomainPathId(TPersQueueReadBalancer* self)
+ : Self(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<TPersQueueReadBalancer::Schema::Data>().Key(1).Update(
+ NIceDb::TUpdate<TPersQueueReadBalancer::Schema::Data::SubDomainPathId>(Self->SubDomainPathId->LocalPathId));
+ return true;
+ }
+
+ void Complete(const TActorContext&) {
+ }
+};
+
void TPersQueueReadBalancer::StartFindSubDomainPathId(bool delayFirstRequest) {
if (!FindSubDomainPathIdActor &&
SchemeShardId != 0 &&
@@ -1365,6 +1398,7 @@ void TPersQueueReadBalancer::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPa
"Discovered subdomain " << msg->LocalPathId << " at RB " << TabletID());
SubDomainPathId.emplace(msg->SchemeShardId, msg->LocalPathId);
+ Execute(new TTxWriteSubDomainPathId(this), ctx);
StartWatchingSubDomainPathId();
}
}
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index fca831618c..c0728debbf 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -62,9 +62,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
struct MaxPartsPerTablet : Column<41, NScheme::NTypeIds::Uint32> {};
struct SchemeShardId : Column<42, NScheme::NTypeIds::Uint64> {};
struct NextPartitionId : Column<43, NScheme::NTypeIds::Uint64> {};
+ struct SubDomainPathId : Column<44, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Key>;
- using TColumns = TableColumns<Key, PathId, Topic, Path, Version, Config, MaxPartsPerTablet, SchemeShardId, NextPartitionId>;
+ using TColumns = TableColumns<Key, PathId, Topic, Path, Version, Config, MaxPartsPerTablet, SchemeShardId, NextPartitionId, SubDomainPathId>;
};
struct Partitions : Table<33> {
@@ -227,7 +228,11 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
}
void InitDone(const TActorContext &ctx) {
- StartFindSubDomainPathId(false);
+ if (SubDomainPathId) {
+ StartWatchingSubDomainPathId();
+ } else {
+ StartFindSubDomainPathId(true);
+ }
StartPartitionIdForWrite = NextPartitionIdForWrite = rand() % TotalGroups;
@@ -498,6 +503,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
std::optional<TPathId> SubDomainPathId;
std::optional<TPathId> WatchingSubDomainPathId;
+ friend struct TTxWriteSubDomainPathId;
bool SubDomainOutOfSpace = false;
public:
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 45c5319e05..45a3e05ef9 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -390,6 +390,7 @@ message TUpdateBalancerConfig { //for schemeshard use only
}
repeated TTablet Tablets = 10;
+ optional uint64 SubDomainPathId = 13;
}
message TDescribe {
@@ -851,6 +852,8 @@ message TEvPeriodicTopicStats {
required uint64 DataSize = 4;
required uint64 UsedReserveSize = 5;
+
+ optional bool SubDomainOutOfSpace = 6;
};
message TEvSubDomainStatus {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 3eb7ed8a4c..df3d99dccf 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -771,6 +771,10 @@ public:
}
}
+ if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) {
+ event->Record.SetSubDomainPathId(subDomainPathId);
+ }
+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose configure PersQueueReadBalancer"
<< ", opId: " << OperationId
diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp
index 2139c6a3cb..7967f72c54 100644
--- a/ydb/core/tx/schemeshard/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats.cpp
@@ -500,6 +500,10 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
ui64 balancerId = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID();
+ auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ UNIT_ASSERT_EQUAL_C(0, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
+ UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer");
+
ui32 msgSeqNo = 100;
WriteToTopic(runtime, topicPath, msgSeqNo, "Message 100");
@@ -507,7 +511,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
Assert(69, 0); // 69 - it is unstable value. it can change if internal message store change
- auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
UNIT_ASSERT_EQUAL_C(69, stats->Record.GetDataSize(), "DataSize from ReadBalancer");
UNIT_ASSERT_EQUAL_C(0, stats->Record.GetUsedReserveSize(), "UsedReserveSize from ReadBalancer");
diff --git a/ydb/core/tx/schemeshard/ut_subdomain.cpp b/ydb/core/tx/schemeshard/ut_subdomain.cpp
index 417171e827..ec752a4d28 100644
--- a/ydb/core/tx/schemeshard/ut_subdomain.cpp
+++ b/ydb/core/tx/schemeshard/ut_subdomain.cpp
@@ -3172,6 +3172,11 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"),
{LsCheckDiskQuotaExceeded(false, "Topic was created")});
+
+ ui64 balancerId = DescribePath(runtime, "/MyRoot/USER_1/Topic1").GetPathDescription().GetPersQueueGroup().GetBalancerTabletID();
+
+ auto stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ UNIT_ASSERT_EQUAL_C(false, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer");
ui32 seqNo = 100;
WriteToTopic(runtime, "/MyRoot/USER_1/Topic1", ++seqNo, "Message 0");
@@ -3180,6 +3185,9 @@ Y_UNIT_TEST_SUITE(TSchemeShardSubDomainTest) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/USER_1"),
{LsCheckDiskQuotaExceeded(true, "Message 0 was written")});
+ stats = NPQ::GetReadBalancerPeriodicTopicStats(runtime, balancerId);
+ UNIT_ASSERT_EQUAL_C(true, stats->Record.GetSubDomainOutOfSpace(), "SubDomainOutOfSpace from ReadBalancer after write");
+
TestDropPQGroup(runtime, ++txId, "/MyRoot/USER_1", "Topic1");
env.TestWaitNotification(runtime, txId);
env.SimulateSleep(runtime, TDuration::Seconds(1));