diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2024-08-17 21:56:39 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-17 21:56:39 +0500 |
commit | 43d45e9a6a3fd3478f1204a939ad5357152c9b12 (patch) | |
tree | b48fe556105eac3f4bf3693f3cce6f1770bce29d | |
parent | a61f9c3a007b65acb5144c3ff1aff7f05c9d4a3d (diff) | |
download | ydb-43d45e9a6a3fd3478f1204a939ad5357152c9b12.tar.gz |
Fix internal PQRB state (#7792) (#7833)
-rw-r--r-- | ydb/core/persqueue/read_balancer__balancing.cpp | 29 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer__balancing_app.cpp | 10 |
2 files changed, 25 insertions, 14 deletions
diff --git a/ydb/core/persqueue/read_balancer__balancing.cpp b/ydb/core/persqueue/read_balancer__balancing.cpp index 43eb834a28e..3cb751e1802 100644 --- a/ydb/core/persqueue/read_balancer__balancing.cpp +++ b/ydb/core/persqueue/read_balancer__balancing.cpp @@ -341,7 +341,7 @@ void TPartitionFamily::AttachePartitions(const std::vector<ui32>& partitions, co } auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(newPartitions); - ChangePartitionCounters(activePartitionCount, activePartitionCount); + ChangePartitionCounters(activePartitionCount, inactivePartitionCount); if (IsActive()) { if (!Session->AllPartitionsReadable(newPartitions)) { @@ -391,7 +391,7 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) { ActivePartitionCount += active; InactivePartitionCount += inactive; - if (IsActive()) { + if (IsActive() && Session) { Session->ActivePartitionCount += active; Session->InactivePartitionCount += inactive; } @@ -709,13 +709,13 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d } std::vector<ui32> members; - GetPartitionGraph().Travers(id, [&](auto childId) { if (partitions.contains(childId)) { - members.push_back(childId); auto [_, i] = processedPartitions.insert(childId); if (!i) { familiesIntersect = true; + } else { + members.push_back(childId); } return true; @@ -723,16 +723,25 @@ bool TConsumer::BreakUpFamily(TPartitionFamily* family, ui32 partitionId, bool d return false; }); - auto* f = CreateFamily({id}, family->Status, ctx); - f->Partitions.insert(f->Partitions.end(), members.begin(), members.end()); + bool locked = family->Session && (family->LockedPartitions.contains(id) || + std::any_of(members.begin(), members.end(), [family](auto id) { return family->LockedPartitions.contains(id); })); + auto* f = CreateFamily({id}, locked ? family->Status : TPartitionFamily::EStatus::Free, ctx); f->TargetStatus = family->TargetStatus; - f->Session = family->Session; - f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions); + f->Partitions.insert(f->Partitions.end(), members.begin(), members.end()); f->LastPipe = family->LastPipe; - if (f->Session) { + f->UpdatePartitionMapping(f->Partitions); + f->ClassifyPartitions(); + if (locked) { + f->LockedPartitions = Intercept(family->LockedPartitions, f->Partitions); + + f->Session = family->Session; f->Session->Families.try_emplace(f->Id, f); + f->Session->ActivePartitionCount += f->ActivePartitionCount; + f->Session->InactivePartitionCount += f->InactivePartitionCount; if (f->IsActive()) { ++f->Session->ActiveFamilyCount; + } else if (f->IsRelesing()) { + ++f->Session->ReleasingFamilyCount; } } @@ -1299,7 +1308,7 @@ void TConsumer::Balance(const TActorContext& ctx) { } } - if (session->ActiveFamilyCount > desiredFamilyCount) { + if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) { --allowPlusOne; } } diff --git a/ydb/core/persqueue/read_balancer__balancing_app.cpp b/ydb/core/persqueue/read_balancer__balancing_app.cpp index 74aaca039be..30b64fd1db2 100644 --- a/ydb/core/persqueue/read_balancer__balancing_app.cpp +++ b/ydb/core/persqueue/read_balancer__balancing_app.cpp @@ -171,8 +171,8 @@ void TBalancer::RenderApp(TStringStream& str) const { TABLEH() { } TABLEH() { str << "Id"; } TABLEH() { str << "Partitions"; } - TABLEH() { str << "<span alt=\"All / Active / Releasing\">Families</span>"; } - TABLEH() { str << "<span alt=\"Active / Inactive / Releasing\">Statistics</span>"; }; + TABLEH() { str << "<span title=\"All families / Active / Releasing\">Families</span>"; } + TABLEH() { str << "<span title=\"All partitions / Active / Inactive / Releasing\">Statistics</span>"; }; TABLEH() { str << "Client node"; } TABLEH() { str << "Proxy node"; } } @@ -203,7 +203,8 @@ void TBalancer::RenderApp(TStringStream& str) const { TABLED() { str << session->SessionName; } TABLED() { str << (session->Partitions.empty() ? "" : JoinRange(", ", session->Partitions.begin(), session->Partitions.end())); } TABLED() { str << session->Families.size() << " / " << session->ActiveFamilyCount << " / " << session->ReleasingFamilyCount; } - TABLED() { str << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; } + TABLED() { str << (session->ActivePartitionCount + session->InactivePartitionCount + session->ReleasingPartitionCount) + << " / " << session->ActivePartitionCount << " / " << session->InactivePartitionCount << " / " << session->ReleasingPartitionCount; } TABLED() { str << session->ClientNode; } TABLED() { str << session->ProxyNodeId; } } @@ -213,7 +214,8 @@ void TBalancer::RenderApp(TStringStream& str) const { TABLED() { str << "<strong>Total:</strong>"; } TABLED() { } TABLED() { str << familyAllCount << " / " << activeFamilyCount << " / " << releasingFamilyCount; } - TABLED() { str << activePartitionCount << " / " << inactivePartitionCount << " / " << releasingPartitionCount; } + TABLED() { str << (activePartitionCount + inactivePartitionCount + releasingPartitionCount) << " / " << activePartitionCount << " / " + << inactivePartitionCount << " / " << releasingPartitionCount; } TABLED() { } TABLED() { } } |