aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-06 22:28:53 +0300
committertesseract <tesseract@yandex-team.com>2023-04-06 22:28:53 +0300
commit042837d631ba8e9b65e4e1ba864b8efbbbaf073b (patch)
tree565aeec7989a7dabd76566e47ca1e01fb5431450
parent25e969329b290c11487f74f566c225593e9661d4 (diff)
downloadydb-042837d631ba8e9b65e4e1ba864b8efbbbaf073b.tar.gz
Simplify code
-rw-r--r--ydb/core/persqueue/read_balancer.cpp69
-rw-r--r--ydb/core/persqueue/read_balancer.h4
2 files changed, 30 insertions, 43 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 16e02cdd38c..52e4b074dda 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -1333,19 +1333,18 @@ void TPersQueueReadBalancer::UnregisterSession(const TActorId& pipe, const TActo
auto jt = ClientsInfo.find(clientId);
Y_VERIFY(jt != ClientsInfo.end());
TClientInfo& clientInfo = jt->second;
- for (auto& c : clientInfo.ClientGroupsInfo) {
- TClientGroupInfo& groupInfo = c.second;
- for (auto& p : groupInfo.PartitionsInfo) { //TODO: reverse map
- if (p.second.Session == pipe) {
- p.second.Session = TActorId();
- p.second.State = EPS_FREE;
- groupInfo.FreePartitions.push_back(p.first);
+ for (auto& [groupKey, groupInfo] : clientInfo.ClientGroupsInfo) {
+ for (auto& [partitionNumber, partitionInfo] : groupInfo.PartitionsInfo) { //TODO: reverse map
+ if (partitionInfo.Session == pipe) {
+ partitionInfo.Session = TActorId();
+ partitionInfo.State = EPS_FREE;
+ groupInfo.FreePartitions.push_back(partitionNumber);
}
}
bool res = groupInfo.EraseSession(pipe);
if (res)
- c.second.ScheduleBalance(ctx);
+ groupInfo.ScheduleBalance(ctx);
}
if (it->second.WithGroups && --clientInfo.SessionsWithGroup == 0) {
clientInfo.MergeGroups(ctx);
@@ -1393,61 +1392,54 @@ void TPersQueueReadBalancer::TClientGroupInfo::Balance(const TActorContext& ctx)
ui32 allowPlusOne = total % sessionsCount;
ui32 cur = allowPlusOne;
//request partitions from sessions if needed
- for (auto& p : SessionsInfo) {
+ for (auto& [sessionKey, sessionInfo] : SessionsInfo) {
ui32 realDesired = (cur > 0) ? desired + 1 : desired;
if (cur > 0)
--cur;
- if (p.second.NumActive <= realDesired + p.second.NumSuspended) {
- continue;
- } else {
- ui32 canRequest = 0;
- Y_VERIFY(p.second.NumActive > realDesired + p.second.NumSuspended);
- canRequest = p.second.NumActive - realDesired - p.second.NumSuspended;
- Y_VERIFY(canRequest > 0);
- ReleasePartition(p.first.first, Group, canRequest, ctx);
+
+ i64 canRequest = ((i64)sessionInfo.NumActive) - sessionInfo.NumSuspended - realDesired;
+ if (canRequest > 0) {
+ ReleasePartition(sessionKey.first, sessionInfo, Group, canRequest, ctx);
}
}
//give free partitions to starving sessions
if (FreePartitions.empty())
return;
+
cur = allowPlusOne;
- for (auto& p : SessionsInfo) {
+ for (auto& [sessionKey, sessionInfo] : SessionsInfo) {
ui32 realDesired = (cur > 0) ? desired + 1 : desired;
if (cur > 0)
--cur;
- if( p.second.NumActive >= realDesired) continue;
- ui32 req = realDesired - p.second.NumActive;
- while (req > 0) {
+
+ if(sessionInfo.NumActive >= realDesired)
+ continue;
+
+ i64 req = ((i64)realDesired) - sessionInfo.NumActive;
+ while (req > 0 && !FreePartitions.empty()) {
--req;
- Y_VERIFY(!FreePartitions.empty());
- LockPartition(p.first.first, FreePartitions.front(), ctx);
+ LockPartition(sessionKey.first, sessionInfo, FreePartitions.front(), ctx);
FreePartitions.pop_front();
if (FreePartitions.empty())
return;
}
- Y_VERIFY(p.second.NumActive >= desired && p.second.NumActive <= desired + 1);
+ Y_VERIFY(sessionInfo.NumActive >= desired && sessionInfo.NumActive <= desired + 1);
}
Y_VERIFY(FreePartitions.empty());
}
-void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx) {
-
- auto jt = SessionsInfo.find(SessionKey(pipe));
- Y_VERIFY(jt != SessionsInfo.end());
-
- auto& pipeInfo = jt->second;
-
+void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, TSessionInfo& sessionInfo, ui32 partition, const TActorContext& ctx) {
auto it = PartitionsInfo.find(partition);
Y_VERIFY(it != PartitionsInfo.end());
it->second.Session = pipe;
it->second.State = EPS_ACTIVE;
- ++pipeInfo.NumActive;
+ ++sessionInfo.NumActive;
//TODO:rebuild structs
THolder<TEvPersQueue::TEvLockPartition> res{new TEvPersQueue::TEvLockPartition};
- res->Record.SetSession(pipeInfo.Session);
+ res->Record.SetSession(sessionInfo.Session);
res->Record.SetPartition(partition);
res->Record.SetTopic(Topic);
res->Record.SetPath(Path);
@@ -1458,17 +1450,12 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe
res->Record.SetTabletId(PartitionsInfo[partition].TabletId);
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "client " << ClientId << " lock partition for pipe "
- << pipe << " session " << pipeInfo.Session << " partition " << partition << " generation " << Generation << " step " << *Step);
+ << pipe << " session " << sessionInfo.Session << " partition " << partition << " generation " << Generation << " step " << *Step);
- ctx.Send(pipeInfo.Sender, res.Release());
+ ctx.Send(sessionInfo.Sender, res.Release());
}
-void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx) {
-
- auto it = SessionsInfo.find(SessionKey(pipe));
- Y_VERIFY(it != SessionsInfo.end());
- auto& sessionInfo = it->second;
-
+void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const ui32 group, const ui32 count, const TActorContext& ctx) {
sessionInfo.NumSuspended += count;
THolder<TEvPersQueue::TEvReleasePartition> res{new TEvPersQueue::TEvReleasePartition};
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index 441959363cb..7b0090a1f96 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -419,8 +419,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void ScheduleBalance(const TActorContext& ctx);
void Balance(const TActorContext& ctx);
- void LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx);
- void ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx);
+ void LockPartition(const TActorId pipe, TSessionInfo& sessionInfo, ui32 partition, const TActorContext& ctx);
+ void ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const ui32 group, const ui32 count, const TActorContext& ctx);
TStringBuilder GetPrefix() const;
bool WakeupScheduled = false;