aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-04-04 18:48:39 +0300
committertesseract <tesseract@yandex-team.com>2023-04-04 18:48:39 +0300
commitaef659b030e121f88e0f2f7d72ed739880cedb9c (patch)
treed8913924d9e8dd087a4664ab4d2379990d14d794
parentc54a8c24834499a7f9ae9bb8b79408990a231fd7 (diff)
downloadydb-aef659b030e121f88e0f2f7d72ed739880cedb9c.tar.gz
Removing code duplication
-rw-r--r--ydb/core/persqueue/read_balancer.cpp84
-rw-r--r--ydb/core/persqueue/read_balancer.h10
2 files changed, 59 insertions, 35 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index ed11ad4eed6..791a8f819a9 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -300,9 +300,9 @@ TString TPersQueueReadBalancer::GenerateStat() {
TABLED() { str << ci.second.Group;}
TABLED() { str << pp.second.TabletId;}
TABLED() { str << (ui32)pp.second.State;}
- auto it = ci.second.SessionsInfo.find(std::make_pair(pp.second.Session, ci.second.RandomNumber));
- Y_VERIFY((it == ci.second.SessionsInfo.end()) == (pp.second.State == EPS_FREE));
- TABLED() { str << (pp.second.State != EPS_FREE ? it->second.Session : "");}
+ auto* session = ci.second.FindSession(pp.second.Session);
+ Y_VERIFY((session == nullptr) == (pp.second.State == EPS_FREE));
+ TABLED() { str << (pp.second.State != EPS_FREE ? session->Session : "");}
}
}
}
@@ -605,7 +605,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
PartitionsInfo = partitionsInfo;
for (auto& p : ClientsInfo) {
- auto mainGroup = p.second.ClientGroupsInfo.find(0);
+ auto mainGroup = p.second.ClientGroupsInfo.find(TClientInfo::MAIN_GROUP);
for (auto& part : newPartitions) {
ui32 group = part.second.Group;
auto it = p.second.SessionsWithGroup ? p.second.ClientGroupsInfo.find(group) : mainGroup;
@@ -1037,7 +1037,7 @@ TPersQueueReadBalancer::TClientGroupInfo& TPersQueueReadBalancer::TClientInfo::A
clientInfo.Generation = Generation;
clientInfo.Step = &Step;
- clientInfo.RandomNumber = TAppData::RandomProvider->GenRand64();
+ clientInfo.SessionKeySalt = TAppData::RandomProvider->GenRand64();
return clientInfo;
}
@@ -1065,7 +1065,7 @@ void TPersQueueReadBalancer::TClientInfo::AddSession(const ui32 group, const THa
auto it = ClientGroupsInfo.find(group);
it->second.SessionsInfo.insert({
- std::make_pair(pipe, it->second.RandomNumber),
+ it->second.SessionKey(pipe),
TClientGroupInfo::TSessionInfo(
record.GetSession(), sender,
record.HasClientNode() ? record.GetClientNode() : "none",
@@ -1144,7 +1144,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvRegisterReadSession::TPtr&
if (!groups.empty()) {
auto jt = it->second.ClientGroupsInfo.find(0);
if (jt != it->second.ClientGroupsInfo.end()) {
- it->second.KillGroup(0, ctx);
+ it->second.KillSessionsWithoutGroup(ctx);
}
for (auto g : groups) {
it->second.AddSession(g, PartitionsInfo, ev->Sender, record);
@@ -1176,13 +1176,13 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
auto pi = response->Record.AddPartitionInfo();
pi->SetPartition(p.first);
if (p.second.State == EPS_ACTIVE) {
- auto jt = c.second.SessionsInfo.find(std::make_pair(p.second.Session, c.second.RandomNumber));
- Y_VERIFY(jt != c.second.SessionsInfo.end());
- pi->SetClientNode(jt->second.ClientNode);
- pi->SetProxyNodeId(jt->second.ProxyNodeId);
- pi->SetSession(jt->second.Session);
- pi->SetTimestamp(jt->second.Timestamp.Seconds());
- pi->SetTimestampMs(jt->second.Timestamp.MilliSeconds());
+ auto* session = c.second.FindSession(p.second.Session);
+ Y_VERIFY(session != nullptr);
+ pi->SetClientNode(session->ClientNode);
+ pi->SetProxyNodeId(session->ProxyNodeId);
+ pi->SetSession(session->Session);
+ pi->SetTimestamp(session->Timestamp.Seconds());
+ pi->SetTimestampMs(session->Timestamp.MilliSeconds());
} else {
pi->SetClientNode("");
pi->SetProxyNodeId(0);
@@ -1203,9 +1203,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
}
-void TPersQueueReadBalancer::TClientInfo::KillGroup(const ui32 group, const TActorContext& ctx) {
- Y_VERIFY(group == 0);
- auto it = ClientGroupsInfo.find(group);
+void TPersQueueReadBalancer::TClientInfo::KillSessionsWithoutGroup(const TActorContext& ctx) {
+ auto it = ClientGroupsInfo.find(MAIN_GROUP);
Y_VERIFY(it != ClientGroupsInfo.end());
for (auto& s : it->second.SessionsInfo) {
THolder<TEvPersQueue::TEvError> response(new TEvPersQueue::TEvError);
@@ -1222,14 +1221,14 @@ void TPersQueueReadBalancer::TClientInfo::MergeGroups(const TActorContext& ctx)
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "client " << ClientId << " merge groups");
- auto& clientInfo = AddGroup(0);
+ auto& clientInfo = AddGroup(MAIN_GROUP);
ui32 numSessions = 0;
ui32 numGroups = 0;
for (auto it = ClientGroupsInfo.begin(); it != ClientGroupsInfo.end();) {
auto jt = it++;
- if (jt->first == 0) {
+ if (jt->first == MAIN_GROUP) {
continue;
}
++numGroups;
@@ -1239,7 +1238,7 @@ void TPersQueueReadBalancer::TClientInfo::MergeGroups(const TActorContext& ctx)
}
for (auto& si : jt->second.SessionsInfo) {
auto key = si.first;
- key.second = clientInfo.RandomNumber;
+ key.second = clientInfo.SessionKeySalt;
auto it = clientInfo.SessionsInfo.find(key);
if (it == clientInfo.SessionsInfo.end()) {
clientInfo.SessionsInfo.insert(std::make_pair(key, si.second)); //there must be all sessions in all groups
@@ -1296,18 +1295,17 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev
auto jt = cit->second.PartitionsInfo.find(record.GetPartition());
- auto kt = cit->second.SessionsInfo.find(std::make_pair(sender, cit->second.RandomNumber));
- if (kt == cit->second.SessionsInfo.end()) { //already dead session
+ auto* session = cit->second.FindSession(sender);
+ if (session == nullptr) { //already dead session
return;
}
- Y_VERIFY(kt != cit->second.SessionsInfo.end());
Y_VERIFY(jt != cit->second.PartitionsInfo.end());
jt->second.Session = TActorId();
jt->second.State = EPS_FREE;
cit->second.FreePartitions.push_back(jt->first);
- --kt->second.NumActive;
- --kt->second.NumSuspended;
+ --session->NumActive;
+ --session->NumSuspended;
cit->second.ScheduleBalance(ctx);
}
@@ -1339,25 +1337,45 @@ void TPersQueueReadBalancer::UnregisterSession(const TActorId& pipe, const TActo
const TString& clientId = it->second.ClientId;
auto jt = ClientsInfo.find(clientId);
Y_VERIFY(jt != ClientsInfo.end());
- for (auto& c : jt->second.ClientGroupsInfo) {
- for (auto& p : c.second.PartitionsInfo) { //TODO: reverse map
+ TClientInfo& clientInfo = jt->second;
+ for (auto& c : clientInfo.ClientGroupsInfo) {
+ TClientGroupInfo& groupInfo = c.second;
+ for (auto& p : groupInfo.PartitionsInfo) { //TODO: reverse map
if (p.second.Session == pipe) {
p.second.Session = TActorId();
p.second.State = EPS_FREE;
- c.second.FreePartitions.push_back(p.first);
+ groupInfo.FreePartitions.push_back(p.first);
}
}
- bool res = c.second.SessionsInfo.erase(std::make_pair(pipe, c.second.RandomNumber));
+
+ bool res = groupInfo.EraseSession(pipe);
if (res)
c.second.ScheduleBalance(ctx);
}
- if (it->second.WithGroups && --jt->second.SessionsWithGroup == 0) {
- jt->second.MergeGroups(ctx);
+ if (it->second.WithGroups && --clientInfo.SessionsWithGroup == 0) {
+ clientInfo.MergeGroups(ctx);
}
PipesInfo.erase(pipe);
}
+
+std::pair<TActorId, ui64> TPersQueueReadBalancer::TClientGroupInfo::SessionKey(const TActorId pipe) const {
+ return std::make_pair(pipe, SessionKeySalt);
+}
+
+bool TPersQueueReadBalancer::TClientGroupInfo::EraseSession(const TActorId pipe) {
+ return SessionsInfo.erase(SessionKey(pipe));
+}
+
+TPersQueueReadBalancer::TClientGroupInfo::TSessionInfo* TPersQueueReadBalancer::TClientGroupInfo::FindSession(const TActorId pipe) {
+ auto it = SessionsInfo.find(SessionKey(pipe));
+ if (it == SessionsInfo.end()) {
+ return nullptr;
+ }
+ return &(it->second);
+}
+
void TPersQueueReadBalancer::TClientGroupInfo::ScheduleBalance(const TActorContext& ctx) {
if (WakeupScheduled)
return;
@@ -1421,7 +1439,7 @@ void TPersQueueReadBalancer::TClientGroupInfo::Balance(const TActorContext& ctx)
void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx) {
- auto jt = SessionsInfo.find(std::make_pair(pipe, RandomNumber));
+ auto jt = SessionsInfo.find(SessionKey(pipe));
Y_VERIFY(jt != SessionsInfo.end());
auto& pipeInfo = jt->second;
@@ -1452,7 +1470,7 @@ void TPersQueueReadBalancer::TClientGroupInfo::LockPartition(const TActorId pipe
void TPersQueueReadBalancer::TClientGroupInfo::ReleasePartition(const TActorId pipe, const ui32 group, const ui32 count, const TActorContext& ctx) {
- auto it = SessionsInfo.find(std::make_pair(pipe, RandomNumber));
+ auto it = SessionsInfo.find(SessionKey(pipe));
Y_VERIFY(it != SessionsInfo.end());
auto& sessionInfo = it->second;
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index ea40500d4d1..441959363cb 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -404,7 +404,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ui64 TabletId;
TString Path;
ui32 Generation = 0;
- ui64 RandomNumber = 0;
+ ui64 SessionKeySalt = 0;
ui32* Step = nullptr;
ui32 Group = 0;
@@ -413,6 +413,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
std::deque<ui32> FreePartitions;
THashMap<std::pair<TActorId, ui64>, TSessionInfo> SessionsInfo; //map from ActorID and random value - need for reordering sessions in different topics
+ std::pair<TActorId, ui64> SessionKey(const TActorId pipe) const;
+ bool EraseSession(const TActorId pipe);
+ TSessionInfo* FindSession(const TActorId pipe);
+
void ScheduleBalance(const TActorContext& ctx);
void Balance(const TActorContext& ctx);
void LockPartition(const TActorId pipe, ui32 partition, const TActorContext& ctx);
@@ -436,6 +440,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
struct TClientInfo {
+ constexpr static ui32 MAIN_GROUP = 0;
+
THashMap<ui32, TClientGroupInfo> ClientGroupsInfo; //map from group to info
ui32 SessionsWithGroup = 0;
@@ -446,7 +452,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ui32 Generation = 0;
ui32 Step = 0;
- void KillGroup(const ui32 group, const TActorContext& ctx);
+ void KillSessionsWithoutGroup(const TActorContext& ctx);
void MergeGroups(const TActorContext& ctx);
TClientGroupInfo& AddGroup(const ui32 group);
void FillEmptyGroup(const ui32 group, const THashMap<ui32, TPartitionInfo>& partitionsInfo);