diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-06 22:28:53 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-06 22:28:53 +0300 |
commit | 042837d631ba8e9b65e4e1ba864b8efbbbaf073b (patch) | |
tree | 565aeec7989a7dabd76566e47ca1e01fb5431450 | |
parent | 25e969329b290c11487f74f566c225593e9661d4 (diff) | |
download | ydb-042837d631ba8e9b65e4e1ba864b8efbbbaf073b.tar.gz |
Simplify code
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 69 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.h | 4 |
2 files changed, 30 insertions, 43 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 16e02cdd38c..52e4b074dda 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -1333,19 +1333,18 @@ void TPersQueueReadBalancer::UnregisterSession(const TActorId& pipe, const TActo auto jt = ClientsInfo.find(clientId); Y_VERIFY(jt != ClientsInfo.end()); TClientInfo& clientInfo = jt->second; - for (auto& c : clientInfo.ClientGroupsInfo) { - TClientGroupInfo& groupInfo = c.second; - for (auto& p : groupInfo.PartitionsInfo) { //TODO: reverse map - if (p.second.Session == pipe) { - p.second.Session = TActorId(); - p.second.State = EPS_FREE; - groupInfo.FreePartitions.push_back(p.first); + for (auto& [groupKey, groupInfo] : clientInfo.ClientGroupsInfo) { + for (auto& [partitionNumber, partitionInfo] : groupInfo.PartitionsInfo) { //TODO: reverse map + if (partitionInfo.Session == pipe) { + partitionInfo.Session = TActorId(); + partitionInfo.State = EPS_FREE; + groupInfo.FreePartitions.push_back(partitionNumber); } } bool res = groupInfo.EraseSession(pipe); if (res) - c.second.ScheduleBalance(ctx); + groupInfo.ScheduleBalance(ctx); } if (it->second.WithGroups && --clientInfo.SessionsWithGroup == 0) { clientInfo.MergeGroups(ctx); @@ -1393,61 +1392,54 @@ void TPersQueueReadBalancer::TClientGroupInfo::Balance(const TActorContext& ctx) ui32 allowPlusOne = total % sessionsCount; ui32 cur = allowPlusOne; //request partitions from sessions if needed - for (auto& p : SessionsInfo) { + for (auto& [sessionKey, sessionInfo] : SessionsInfo) { ui32 realDesired = (cur > 0) ? desired + 1 : desired; if (cur > 0) --cur; - if (p.second.NumActive <= realDesired + p.second.NumSuspended) { - continue; - } else { - ui32 canRequest = 0; - Y_VERIFY(p.second.NumActive > realDesired + p.second.NumSuspended); - canRequest = p.second.NumActive - realDesired - p.second.NumSuspended; - Y_VERIFY(canRequest > 0); - ReleasePartition(p.first.first, Group, canRequest, ctx); + + i64 canRequest = ((i64)sessionInfo.NumActive) - sessionInfo.NumSuspended - realDesired; + if (canRequest > 0) { + ReleasePartition(sessionKey.first, sessionInfo, Group, canRequest, ctx); } } //give free partitions to starving sessions if (FreePartitions.empty()) return; + cur = allowPlusOne; - for (auto& p : SessionsInfo) { + for (auto& [sessionKey, sessionInfo] : SessionsInfo) { ui32 realDesired = (cur > 0) ? desired + 1 : desired; if (cur > 0) --cur; - if( p.second.NumActive >= realDesired) continue; - ui32 req = realDesired - p.second.NumActive; - while (req > 0) { + + if(sessionInfo.NumActive >= realDesired) + continue; + + i64 req = ((i64)realDesired) - sessionInfo.NumActive; + while (req > 0 && !FreePartitions.empty()) { --req; - Y_VERIFY(!FreePartitions.empty()); - LockPartition(p.first.first, FreePartitions.front(), ctx); + LockPartition(sessionKey.first, sessionInfo, FreePartitions.front(), ctx); FreePartitions.pop_front(); if (FreePartitions.empty()) return; } - Y_VERIFY(p.second.NumActive >= desired && p.second.NumActive <= desired + 1); + Y_VERIFY(sessionInfo.NumActive >= desired && sessionInfo.NumActive <= desired + 1); } Y_VERIFY(FreePartitions.empty()); } -void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx) { - - auto jt = SessionsInfo.find(SessionKey(pipe)); - Y_VERIFY(jt != SessionsInfo.end()); - - auto& pipeInfo = jt->second; - +void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, TSessionInfo& sessionInfo, ui32 partition, const TActorContext& ctx) { auto it = PartitionsInfo.find(partition); Y_VERIFY(it != PartitionsInfo.end()); it->second.Session = pipe; it->second.State = EPS_ACTIVE; - ++pipeInfo.NumActive; + ++sessionInfo.NumActive; //TODO:rebuild structs THolder<TEvPersQueue::TEvLockPartition> res{new TEvPersQueue::TEvLockPartition}; - res->Record.SetSession(pipeInfo.Session); + res->Record.SetSession(sessionInfo.Session); res->Record.SetPartition(partition); res->Record.SetTopic(Topic); res->Record.SetPath(Path); @@ -1458,17 +1450,12 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe res->Record.SetTabletId(PartitionsInfo[partition].TabletId); LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "client " << ClientId << " lock partition for pipe " - << pipe << " session " << pipeInfo.Session << " partition " << partition << " generation " << Generation << " step " << *Step); + << pipe << " session " << sessionInfo.Session << " partition " << partition << " generation " << Generation << " step " << *Step); - ctx.Send(pipeInfo.Sender, res.Release()); + ctx.Send(sessionInfo.Sender, res.Release()); } -void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx) { - - auto it = SessionsInfo.find(SessionKey(pipe)); - Y_VERIFY(it != SessionsInfo.end()); - auto& sessionInfo = it->second; - +void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const ui32 group, const ui32 count, const TActorContext& ctx) { sessionInfo.NumSuspended += count; THolder<TEvPersQueue::TEvReleasePartition> res{new TEvPersQueue::TEvReleasePartition}; diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index 441959363cb..7b0090a1f96 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -419,8 +419,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa void ScheduleBalance(const TActorContext& ctx); void Balance(const TActorContext& ctx); - void LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx); - void ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx); + void LockPartition(const TActorId pipe, TSessionInfo& sessionInfo, ui32 partition, const TActorContext& ctx); + void ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const ui32 group, const ui32 count, const TActorContext& ctx); TStringBuilder GetPrefix() const; bool WakeupScheduled = false; |