aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2022-09-16 14:01:04 +0300
committerkomels <komels@ydb.tech>2022-09-16 14:01:04 +0300
commita2adf32166121a2e9e6b4113f4c05fdc703d01ca (patch)
treedfe6d78cd479b8d67d61e9c25e97cba4d00eb76b
parent5fbc1694e011960d1cf103b82034a0ff61f0d6ce (diff)
downloadydb-a2adf32166121a2e9e6b4113f4c05fdc703d01ca.tar.gz
Refactor node id overried in pq meta request
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp20
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h2
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest.cpp13
3 files changed, 14 insertions, 21 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index 99a715f09a6..2dca4689cce 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -419,24 +419,14 @@ NKikimrClient::TResponse TPersQueueBaseRequestProcessor::MergeSubactorReplies()
TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply)
: NodesInfoReply(std::move(nodesInfoReply))
{
- THashMap<TString, ui32> staticNodeIdsByHost;
- THashMap<TString, TVector<ui32>> dynamicNodeIdsByHost;
HostNames.reserve(NodesInfoReply->Nodes.size());
for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) {
HostNames.emplace(info.NodeId, info.Host);
- if (info.NodeId < 1000) {
- staticNodeIdsByHost.insert(std::make_pair(info.Host, info.NodeId));
- } else {
- dynamicNodeIdsByHost[info.Host].push_back(info.NodeId);
- }
- }
- for (const auto& [host, nodeIds] : dynamicNodeIdsByHost) {
- auto staticIter = staticNodeIdsByHost.find(host);
- if (staticIter.IsEnd())
- continue;
- for (auto& dynamicId : nodeIds) {
- auto insRes = DynamicNodeIdsOverride.insert(std::make_pair(dynamicId, staticIter->second));
- Y_VERIFY(insRes.second);
+ auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId));
+ if (insRes.second) {
+ if (insRes.first->second > info.NodeId) {
+ insRes.first->second = info.NodeId;
+ }
}
}
}
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index 8997af055c4..3a61476e53d 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -83,7 +83,7 @@ public:
struct TNodesInfo {
THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply;
THashMap<ui32, TString> HostNames;
- THashMap<ui32, ui32> DynamicNodeIdsOverride;
+ THashMap<TString, ui32> MinNodeIdByHost;
explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply);
};
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
index 11152f41de3..3e68d399dbc 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
@@ -417,12 +417,15 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer(
if (hostName != NodesInfo->HostNames.end()) {
location.SetHost(hostName->second);
location.SetErrorCode(NPersQueue::NErrorCode::OK);
- auto dynamicIter = NodesInfo->DynamicNodeIdsOverride.find(nodeId);
- if (dynamicIter.IsEnd()) {
+ if (nodeId < 1000) {
location.SetHostId(nodeId);
- } else {
- Y_VERIFY(dynamicIter->first > dynamicIter->second);
- location.SetHostId(dynamicIter->second);
+ } else {
+ auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second);
+ if (minIter.IsEnd()) {
+ location.SetHostId(nodeId);
+ } else {
+ location.SetHostId(minIter->second);
+ }
}
} else {
statusInitializing = true;