aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-08-17 21:56:39 +0500
committerGitHub <noreply@github.com>2024-08-17 21:56:39 +0500
commit43d45e9a6a3fd3478f1204a939ad5357152c9b12 (patch)
treeb48fe556105eac3f4bf3693f3cce6f1770bce29d
parenta61f9c3a007b65acb5144c3ff1aff7f05c9d4a3d (diff)
downloadydb-43d45e9a6a3fd3478f1204a939ad5357152c9b12.tar.gz
Fix internal PQRB state (#7792) (#7833)
-rw-r--r--ydb/core/persqueue/read_balancer__balancing.cpp29
-rw-r--r--ydb/core/persqueue/read_balancer__balancing_app.cpp10
2 files changed, 25 insertions, 14 deletions
diff --git a/ydb/core/persqueue/read_balancer__balancing.cpp b/ydb/core/persqueue/read_balancer__balancing.cpp
index 43eb834a28e..3cb751e1802 100644
--- a/ydb/core/persqueue/read_balancer__balancing.cpp
+++ b/ydb/core/persqueue/read_balancer__balancing.cpp
@@ -341,7 +341,7 @@ void TPartitionFamily::AttachePartitions(const std::vector<ui32>& partitions, co
}
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(newPartitions);
- ChangePartitionCounters(activePartitionCount, activePartitionCount);
+ ChangePartitionCounters(activePartitionCount, inactivePartitionCount);
if (IsActive()) {
if (!Session->AllPartitionsReadable(newPartitions)) {
@@ -391,7 +391,7 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) {
ActivePartitionCount += active;
InactivePartitionCount += inactive;
- if (IsActive()) {
+ if (IsActive() && Session) {
Session->ActivePartitionCount += active;
Session->InactivePartitionCount += inactive;
}
@@ -709,13 +709,13 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d
}
std::vector<ui32> members;
-
GetPartitionGraph().Travers(id, [&](auto childId) {
if (partitions.contains(childId)) {
- members.push_back(childId);
auto [_, i] = processedPartitions.insert(childId);
if (!i) {
familiesIntersect = true;
+ } else {
+ members.push_back(childId);
}
return true;
@@ -723,16 +723,25 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d
return false;
});
- auto* f = CreateFamily({id}, family->Status, ctx);
- f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
+ bool locked = family->Session && (family->LockedPartitions.contains(id) ||
+ std::any_of(members.begin(), members.end(), [family](auto id) { return family->LockedPartitions.contains(id); }));
+ auto* f = CreateFamily({id}, locked ? family->Status : TPartitionFamily::EStatus::Free, ctx);
f->TargetStatus = family->TargetStatus;
- f->Session = family->Session;
- f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
+ f->Partitions.insert(f->Partitions.end(), members.begin(), members.end());
f->LastPipe = family->LastPipe;
- if (f->Session) {
+ f->UpdatePartitionMapping(f->Partitions);
+ f->ClassifyPartitions();
+ if (locked) {
+ f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions);
+
+ f->Session = family->Session;
f->Session->Families.try_emplace(f->Id, f);
+ f->Session->ActivePartitionCount += f->ActivePartitionCount;
+ f->Session->InactivePartitionCount += f->InactivePartitionCount;
if (f->IsActive()) {
++f->Session->ActiveFamilyCount;
+ } else if (f->IsRelesing()) {
+ ++f->Session->ReleasingFamilyCount;
}
}
@@ -1299,7 +1308,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}
}
- if (session->ActiveFamilyCount > desiredFamilyCount) {
+ if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
--allowPlusOne;
}
}
diff --git a/ydb/core/persqueue/read_balancer__balancing_app.cpp b/ydb/core/persqueue/read_balancer__balancing_app.cpp
index 74aaca039be..30b64fd1db2 100644
--- a/ydb/core/persqueue/read_balancer__balancing_app.cpp
+++ b/ydb/core/persqueue/read_balancer__balancing_app.cpp
@@ -171,8 +171,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
TABLEH() { }
TABLEH() { str << "Id"; }
TABLEH() { str << "Partitions"; }
- TABLEH() { str << "<span alt=\"All / Active / Releasing\">Families</span>"; }
- TABLEH() { str << "<span alt=\"Active / Inactive / Releasing\">Statistics</span>"; };
+ TABLEH() { str << "<span title=\"All families / Active / Releasing\">Families</span>"; }
+ TABLEH() { str << "<span title=\"All partitions / Active / Inactive / Releasing\">Statistics</span>"; };
TABLEH() { str << "Client node"; }
TABLEH() { str << "Proxy node"; }
}
@@ -203,7 +203,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
TABLED() { str << session->SessionName; }
TABLED() { str << (session->Partitions.empty() ? "" : JoinRange(", ", session->Partitions.begin(), session->Partitions.end())); }
TABLED() { str << session->Families.size() << " / " << session->ActiveFamilyCount << " / " << session->ReleasingFamilyCount; }
- TABLED() { str << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
+ TABLED() { str << (session->ActivePartitionCount + session->InactivePartitionCount + session->ReleasingPartitionCount)
+ << " / " << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; }
TABLED() { str << session->ClientNode; }
TABLED() { str << session->ProxyNodeId; }
}
@@ -213,7 +214,8 @@ void TBalancer::RenderApp(TStringStream& str) const {
TABLED() { str << "<strong>Total:</strong>"; }
TABLED() { }
TABLED() { str << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; }
- TABLED() { str << activePartitionCount << " / " << inactivePartitionCount << " / " << releasingPartitionCount; }
+ TABLED() { str << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / "
+ << inactivePartitionCount << " / " << releasingPartitionCount; }
TABLED() { }
TABLED() { }
}