diff options
author | komels <komels@ydb.tech> | 2022-09-16 14:01:04 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2022-09-16 14:01:04 +0300 |
commit | a2adf32166121a2e9e6b4113f4c05fdc703d01ca (patch) | |
tree | dfe6d78cd479b8d67d61e9c25e97cba4d00eb76b | |
parent | 5fbc1694e011960d1cf103b82034a0ff61f0d6ce (diff) | |
download | ydb-a2adf32166121a2e9e6b4113f4c05fdc703d01ca.tar.gz |
Refactor node id overried in pq meta request
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 20 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.h | 2 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metarequest.cpp | 13 |
3 files changed, 14 insertions, 21 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index 99a715f09a6..2dca4689cce 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -419,24 +419,14 @@ NKikimrClient::TResponse TPersQueueBaseRequestProcessor::MergeSubactorReplies() TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply) : NodesInfoReply(std::move(nodesInfoReply)) { - THashMap<TString, ui32> staticNodeIdsByHost; - THashMap<TString, TVector<ui32>> dynamicNodeIdsByHost; HostNames.reserve(NodesInfoReply->Nodes.size()); for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) { HostNames.emplace(info.NodeId, info.Host); - if (info.NodeId < 1000) { - staticNodeIdsByHost.insert(std::make_pair(info.Host, info.NodeId)); - } else { - dynamicNodeIdsByHost[info.Host].push_back(info.NodeId); - } - } - for (const auto& [host, nodeIds] : dynamicNodeIdsByHost) { - auto staticIter = staticNodeIdsByHost.find(host); - if (staticIter.IsEnd()) - continue; - for (auto& dynamicId : nodeIds) { - auto insRes = DynamicNodeIdsOverride.insert(std::make_pair(dynamicId, staticIter->second)); - Y_VERIFY(insRes.second); + auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId)); + if (insRes.second) { + if (insRes.first->second > info.NodeId) { + insRes.first->second = info.NodeId; + } } } } diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index 8997af055c4..3a61476e53d 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -83,7 +83,7 @@ public: struct TNodesInfo { THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply; THashMap<ui32, TString> HostNames; - THashMap<ui32, ui32> DynamicNodeIdsOverride; + THashMap<TString, ui32> MinNodeIdByHost; explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply); }; diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp index 11152f41de3..3e68d399dbc 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp @@ -417,12 +417,15 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer( if (hostName != NodesInfo->HostNames.end()) { location.SetHost(hostName->second); location.SetErrorCode(NPersQueue::NErrorCode::OK); - auto dynamicIter = NodesInfo->DynamicNodeIdsOverride.find(nodeId); - if (dynamicIter.IsEnd()) { + if (nodeId < 1000) { location.SetHostId(nodeId); - } else { - Y_VERIFY(dynamicIter->first > dynamicIter->second); - location.SetHostId(dynamicIter->second); + } else { + auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second); + if (minIter.IsEnd()) { + location.SetHostId(nodeId); + } else { + location.SetHostId(minIter->second); + } } } else { statusInitializing = true; |