diff options
author | tesseract <tesseract@yandex-team.com> | 2023-04-04 18:48:39 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-04-04 18:48:39 +0300 |
commit | aef659b030e121f88e0f2f7d72ed739880cedb9c (patch) | |
tree | d8913924d9e8dd087a4664ab4d2379990d14d794 | |
parent | c54a8c24834499a7f9ae9bb8b79408990a231fd7 (diff) | |
download | ydb-aef659b030e121f88e0f2f7d72ed739880cedb9c.tar.gz |
Removing code duplication
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 84 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.h | 10 |
2 files changed, 59 insertions, 35 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index ed11ad4eed6..791a8f819a9 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -300,9 +300,9 @@ TString TPersQueueReadBalancer::GenerateStat() { TABLED() { str << ci.second.Group;} TABLED() { str << pp.second.TabletId;} TABLED() { str << (ui32)pp.second.State;} - auto it = ci.second.SessionsInfo.find(std::make_pair(pp.second.Session, ci.second.RandomNumber)); - Y_VERIFY((it == ci.second.SessionsInfo.end()) == (pp.second.State == EPS_FREE)); - TABLED() { str << (pp.second.State != EPS_FREE ? it->second.Session : "");} + auto* session = ci.second.FindSession(pp.second.Session); + Y_VERIFY((session == nullptr) == (pp.second.State == EPS_FREE)); + TABLED() { str << (pp.second.State != EPS_FREE ? session->Session : "");} } } } @@ -605,7 +605,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr PartitionsInfo = partitionsInfo; for (auto& p : ClientsInfo) { - auto mainGroup = p.second.ClientGroupsInfo.find(0); + auto mainGroup = p.second.ClientGroupsInfo.find(TClientInfo::MAIN_GROUP); for (auto& part : newPartitions) { ui32 group = part.second.Group; auto it = p.second.SessionsWithGroup ? p.second.ClientGroupsInfo.find(group) : mainGroup; @@ -1037,7 +1037,7 @@ TPersQueueReadBalancer::TClientGroupInfo& TPersQueueReadBalancer::TClientInfo::A clientInfo.Generation = Generation; clientInfo.Step = &Step; - clientInfo.RandomNumber = TAppData::RandomProvider->GenRand64(); + clientInfo.SessionKeySalt = TAppData::RandomProvider->GenRand64(); return clientInfo; } @@ -1065,7 +1065,7 @@ void TPersQueueReadBalancer::TClientInfo::AddSession(const ui32 group, const THa auto it = ClientGroupsInfo.find(group); it->second.SessionsInfo.insert({ - std::make_pair(pipe, it->second.RandomNumber), + it->second.SessionKey(pipe), TClientGroupInfo::TSessionInfo( record.GetSession(), sender, record.HasClientNode() ? record.GetClientNode() : "none", @@ -1144,7 +1144,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvRegisterReadSession::TPtr& if (!groups.empty()) { auto jt = it->second.ClientGroupsInfo.find(0); if (jt != it->second.ClientGroupsInfo.end()) { - it->second.KillGroup(0, ctx); + it->second.KillSessionsWithoutGroup(ctx); } for (auto g : groups) { it->second.AddSession(g, PartitionsInfo, ev->Sender, record); @@ -1176,13 +1176,13 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& auto pi = response->Record.AddPartitionInfo(); pi->SetPartition(p.first); if (p.second.State == EPS_ACTIVE) { - auto jt = c.second.SessionsInfo.find(std::make_pair(p.second.Session, c.second.RandomNumber)); - Y_VERIFY(jt != c.second.SessionsInfo.end()); - pi->SetClientNode(jt->second.ClientNode); - pi->SetProxyNodeId(jt->second.ProxyNodeId); - pi->SetSession(jt->second.Session); - pi->SetTimestamp(jt->second.Timestamp.Seconds()); - pi->SetTimestampMs(jt->second.Timestamp.MilliSeconds()); + auto* session = c.second.FindSession(p.second.Session); + Y_VERIFY(session != nullptr); + pi->SetClientNode(session->ClientNode); + pi->SetProxyNodeId(session->ProxyNodeId); + pi->SetSession(session->Session); + pi->SetTimestamp(session->Timestamp.Seconds()); + pi->SetTimestampMs(session->Timestamp.MilliSeconds()); } else { pi->SetClientNode(""); pi->SetProxyNodeId(0); @@ -1203,9 +1203,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& } -void TPersQueueReadBalancer::TClientInfo::KillGroup(const ui32 group, const TActorContext& ctx) { - Y_VERIFY(group == 0); - auto it = ClientGroupsInfo.find(group); +void TPersQueueReadBalancer::TClientInfo::KillSessionsWithoutGroup(const TActorContext& ctx) { + auto it = ClientGroupsInfo.find(MAIN_GROUP); Y_VERIFY(it != ClientGroupsInfo.end()); for (auto& s : it->second.SessionsInfo) { THolder<TEvPersQueue::TEvError> response(new TEvPersQueue::TEvError); @@ -1222,14 +1221,14 @@ void TPersQueueReadBalancer::TClientInfo::MergeGroups(const TActorContext& ctx) LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "client " << ClientId << " merge groups"); - auto& clientInfo = AddGroup(0); + auto& clientInfo = AddGroup(MAIN_GROUP); ui32 numSessions = 0; ui32 numGroups = 0; for (auto it = ClientGroupsInfo.begin(); it != ClientGroupsInfo.end();) { auto jt = it++; - if (jt->first == 0) { + if (jt->first == MAIN_GROUP) { continue; } ++numGroups; @@ -1239,7 +1238,7 @@ void TPersQueueReadBalancer::TClientInfo::MergeGroups(const TActorContext& ctx) } for (auto& si : jt->second.SessionsInfo) { auto key = si.first; - key.second = clientInfo.RandomNumber; + key.second = clientInfo.SessionKeySalt; auto it = clientInfo.SessionsInfo.find(key); if (it == clientInfo.SessionsInfo.end()) { clientInfo.SessionsInfo.insert(std::make_pair(key, si.second)); //there must be all sessions in all groups @@ -1296,18 +1295,17 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev auto jt = cit->second.PartitionsInfo.find(record.GetPartition()); - auto kt = cit->second.SessionsInfo.find(std::make_pair(sender, cit->second.RandomNumber)); - if (kt == cit->second.SessionsInfo.end()) { //already dead session + auto* session = cit->second.FindSession(sender); + if (session == nullptr) { //already dead session return; } - Y_VERIFY(kt != cit->second.SessionsInfo.end()); Y_VERIFY(jt != cit->second.PartitionsInfo.end()); jt->second.Session = TActorId(); jt->second.State = EPS_FREE; cit->second.FreePartitions.push_back(jt->first); - --kt->second.NumActive; - --kt->second.NumSuspended; + --session->NumActive; + --session->NumSuspended; cit->second.ScheduleBalance(ctx); } @@ -1339,25 +1337,45 @@ void TPersQueueReadBalancer::UnregisterSession(const TActorId& pipe, const TActo const TString& clientId = it->second.ClientId; auto jt = ClientsInfo.find(clientId); Y_VERIFY(jt != ClientsInfo.end()); - for (auto& c : jt->second.ClientGroupsInfo) { - for (auto& p : c.second.PartitionsInfo) { //TODO: reverse map + TClientInfo& clientInfo = jt->second; + for (auto& c : clientInfo.ClientGroupsInfo) { + TClientGroupInfo& groupInfo = c.second; + for (auto& p : groupInfo.PartitionsInfo) { //TODO: reverse map if (p.second.Session == pipe) { p.second.Session = TActorId(); p.second.State = EPS_FREE; - c.second.FreePartitions.push_back(p.first); + groupInfo.FreePartitions.push_back(p.first); } } - bool res = c.second.SessionsInfo.erase(std::make_pair(pipe, c.second.RandomNumber)); + + bool res = groupInfo.EraseSession(pipe); if (res) c.second.ScheduleBalance(ctx); } - if (it->second.WithGroups && --jt->second.SessionsWithGroup == 0) { - jt->second.MergeGroups(ctx); + if (it->second.WithGroups && --clientInfo.SessionsWithGroup == 0) { + clientInfo.MergeGroups(ctx); } PipesInfo.erase(pipe); } + +std::pair<TActorId, ui64> TPersQueueReadBalancer::TClientGroupInfo::SessionKey(const TActorId pipe) const { + return std::make_pair(pipe, SessionKeySalt); +} + +bool TPersQueueReadBalancer::TClientGroupInfo::EraseSession(const TActorId pipe) { + return SessionsInfo.erase(SessionKey(pipe)); +} + +TPersQueueReadBalancer::TClientGroupInfo::TSessionInfo* TPersQueueReadBalancer::TClientGroupInfo::FindSession(const TActorId pipe) { + auto it = SessionsInfo.find(SessionKey(pipe)); + if (it == SessionsInfo.end()) { + return nullptr; + } + return &(it->second); +} + void TPersQueueReadBalancer::TClientGroupInfo::ScheduleBalance(const TActorContext& ctx) { if (WakeupScheduled) return; @@ -1421,7 +1439,7 @@ void TPersQueueReadBalancer::TClientGroupInfo::Balance(const TActorContext& ctx) void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx) { - auto jt = SessionsInfo.find(std::make_pair(pipe, RandomNumber)); + auto jt = SessionsInfo.find(SessionKey(pipe)); Y_VERIFY(jt != SessionsInfo.end()); auto& pipeInfo = jt->second; @@ -1452,7 +1470,7 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx) { - auto it = SessionsInfo.find(std::make_pair(pipe, RandomNumber)); + auto it = SessionsInfo.find(SessionKey(pipe)); Y_VERIFY(it != SessionsInfo.end()); auto& sessionInfo = it->second; diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index ea40500d4d1..441959363cb 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -404,7 +404,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa ui64 TabletId; TString Path; ui32 Generation = 0; - ui64 RandomNumber = 0; + ui64 SessionKeySalt = 0; ui32* Step = nullptr; ui32 Group = 0; @@ -413,6 +413,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa std::deque<ui32> FreePartitions; THashMap<std::pair<TActorId, ui64>, TSessionInfo> SessionsInfo; //map from ActorID and random value - need for reordering sessions in different topics + std::pair<TActorId, ui64> SessionKey(const TActorId pipe) const; + bool EraseSession(const TActorId pipe); + TSessionInfo* FindSession(const TActorId pipe); + void ScheduleBalance(const TActorContext& ctx); void Balance(const TActorContext& ctx); void LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx); @@ -436,6 +440,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa struct TClientInfo { + constexpr static ui32 MAIN_GROUP = 0; + THashMap<ui32, TClientGroupInfo> ClientGroupsInfo; //map from group to info ui32 SessionsWithGroup = 0; @@ -446,7 +452,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa ui32 Generation = 0; ui32 Step = 0; - void KillGroup(const ui32 group, const TActorContext& ctx); + void KillSessionsWithoutGroup(const TActorContext& ctx); void MergeGroups(const TActorContext& ctx); TClientGroupInfo& AddGroup(const ui32 group); void FillEmptyGroup(const ui32 group, const THashMap<ui32, TPartitionInfo>& partitionsInfo); |