diff options
author | komels <komels@ydb.tech> | 2023-02-14 14:05:38 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-02-14 14:05:38 +0300 |
commit | a01a146b112ce014853d21b4a11daa88a877c892 (patch) | |
tree | 5ca6e1976252425f9fab1dd71d3fd31b767b3d9d | |
parent | e0b2154fab00a08c07439524ad258e459b7bc4d3 (diff) | |
download | ydb-a01a146b112ce014853d21b4a11daa88a877c892.tar.gz |
- full mapping with hive interaction
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 72 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.h | 16 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 188 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.h | 17 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metarequest.cpp | 34 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 3 |
6 files changed, 250 insertions, 80 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index 0f0437c673..a7de796c8f 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -214,10 +214,9 @@ void TPersQueueBaseRequestProcessor::Die(const TActorContext& ctx) { STFUNC(TPersQueueBaseRequestProcessor::StateFunc) { switch (ev->GetTypeRewrite()) { HFunc(TEvInterconnect::TEvNodesInfo, Handle); - HFunc(TInterconnectProxyTCP::TEvStats, Handle) - CFunc(TEvents::TSystem::Undelivered, HandleUndelivered) HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse, Handle); HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse, Handle); + HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse, Handle); HFunc(TEvPersQueue::TEvResponse, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); CFunc(NActors::TEvents::TSystem::PoisonPill, Die); @@ -251,17 +250,10 @@ void TPersQueueBaseRequestProcessor::Handle(TEvInterconnect::TEvNodesInfo::TPtr& } } -void TPersQueueBaseRequestProcessor::Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx) { - NodesInfo->PingReply(ev, ctx); - if (ReadyToCreateChildren()) { - if (CreateChildren(ctx)) { - return; - } - } -} - -void TPersQueueBaseRequestProcessor::HandleUndelivered(const TActorContext& ctx) { - NodesInfo->PingFailed(ctx); +void TPersQueueBaseRequestProcessor::Handle( + TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext& ctx +) { + NodesInfo->ProcessNodesMapping(ev, ctx); if (ReadyToCreateChildren()) { if (CreateChildren(ctx)) { return; @@ -337,10 +329,14 @@ void TPersQueueBaseRequestProcessor::Handle( } bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const { - return TopicsDescription && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready)); + return TopicsDescription + && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready)); } bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) { + if (ChildrenCreationDone) + return false; + ChildrenCreationDone = true; Y_VERIFY(TopicsDescription->ResultSet.size() == TopicsConverters.size()); ui32 i = 0; for (const auto& entry : TopicsDescription->ResultSet) { @@ -379,6 +375,7 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext& NeedChildrenCreation = false; THashSet<TString> topics; + while (!ChildrenToCreate.empty()) { THolder<TPerTopicInfo> perTopicInfo(ChildrenToCreate.front().Release()); ChildrenToCreate.pop_front(); @@ -402,7 +399,6 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext& Children.emplace(actorId, std::move(perTopicInfo)); } } - Y_VERIFY(topics.size() == Children.size()); if (!TopicsToRequest.empty() && TopicsToRequest.size() != topics.size()) { @@ -442,22 +438,15 @@ TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo( : NodesInfoReply(std::move(nodesInfoReply)) { const auto& pqConfig = AppData(ctx)->PQConfig; - bool useMapping = pqConfig.GetDynNodePartitionLocationsMapping(); + bool useMapping = pqConfig.GetPQDiscoveryConfig().GetUseDynNodesMapping(); HostNames.reserve(NodesInfoReply->Nodes.size()); for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) { HostNames.emplace(info.NodeId, info.Host); if (useMapping) { - TActorSystem* const as = ctx.ExecutorThread.ActorSystem; - if (info.NodeId > pqConfig.GetMaxStorageNodeId()) { - DynNodes.push_back(info.NodeId); - } else { - ctx.Send(as->InterconnectProxy(info.NodeId), new TInterconnectProxyTCP::TEvQueryStats, - IEventHandle::FlagTrackDelivery); - ++NodesPingsPending; - if (MaxStaticNodeId < info.NodeId) { - MaxStaticNodeId = info.NodeId; - } - } + ctx.Send( + CreatePersQueueMetaCacheV2Id(), + new TEvPqNewMetaCache::TEvGetNodesMappingRequest() + ); } else { Ready = true; auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId)); @@ -478,35 +467,14 @@ TTopicInfoBasedActor::TTopicInfoBasedActor(const TSchemeEntry& topicEntry, const { } -void TPersQueueBaseRequestProcessor::TNodesInfo::PingReply( - TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx +void TPersQueueBaseRequestProcessor::TNodesInfo::ProcessNodesMapping( + TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext& ) { - --NodesPingsPending; - if (ev->Get()->ProxyStats.Connected) { - StaticNodes.insert(ev->Get()->PeerNodeId); - } - FinalizeWhenReady(ctx); -} - -void TPersQueueBaseRequestProcessor::TNodesInfo::PingFailed(const TActorContext& ctx) { - --NodesPingsPending; - FinalizeWhenReady(ctx); -} - - -void TPersQueueBaseRequestProcessor::TNodesInfo::FinalizeWhenReady(const TActorContext& ctx) { - if (NodesPingsPending) - Finalize(ctx); -} - -void TPersQueueBaseRequestProcessor::TNodesInfo::Finalize(const TActorContext&) { - for (auto& dynNodeId : DynNodes) { - ui32 hash_ = dynNodeId % (MaxStaticNodeId + 1); - DynToStaticNode[dynNodeId] = *StaticNodes.lower_bound(hash_); - } + DynToStaticNode = std::move(ev->Get()->NodesMapping); Ready = true; } + void TTopicInfoBasedActor::Bootstrap(const TActorContext &ctx) { Become(&TTopicInfoBasedActor::StateFunc); BootstrapImpl(ctx); diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index de5ed1c2a1..720527078a 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -85,19 +85,15 @@ public: THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply; THashMap<ui32, TString> HostNames; THashMap<TString, ui32> MinNodeIdByHost; - THashMap<ui32, ui32> DynToStaticNode; + std::shared_ptr<THashMap<ui32, ui32>> DynToStaticNode; + bool Ready = false; - void PingReply(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx); - void PingFailed(const TActorContext& ctx); + void ProcessNodesMapping(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, + const TActorContext& ctx); explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx); private: void FinalizeWhenReady(const TActorContext& ctx); void Finalize(const TActorContext& ctx); - - ui64 NodesPingsPending = 0; - ui64 MaxStaticNodeId = 0; - TVector<ui32> DynNodes; - TSet<ui64> StaticNodes; }; public: @@ -107,6 +103,7 @@ public: bool NeedChildrenCreation = false; ui32 ChildrenCreated = 0; + bool ChildrenCreationDone = false; std::deque<THolder<TPerTopicInfo>> ChildrenToCreate; @@ -143,8 +140,7 @@ protected: virtual STFUNC(StateFunc); void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx); - void Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx); - void HandleUndelivered(const TActorContext& ctx); + void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index ff0501fb45..91e7e7dac7 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -14,6 +14,9 @@ #include <ydb/core/persqueue/pq_database.h> #include <ydb/core/persqueue/cluster_tracker.h> +#include <library/cpp/actors/protos/actors.pb.h> +#include <library/cpp/actors/core/mon.h> +#include <library/cpp/json/json_reader.h> namespace NKikimr::NMsgBusProxy { @@ -94,6 +97,19 @@ public: if (!metaCacheConfig.GetLBFrontEnabled()) { return; } + if (metaCacheConfig.GetUseDynNodesMapping()) { + TStringBuf tenant = AppData(ctx)->TenantName; + tenant.SkipPrefix("/"); + tenant.ChopSuffix("/"); + if (tenant != "Root") { + LOG_NOTICE_S(ctx, NKikimrServices::PQ_METACACHE, "Started on tenant = '" << tenant << "', will not request hive"); + OnDynNode = true; + } else { + StartHivePipe(ctx); + ProcessNodesInfoWork(ctx); + } + } + SkipVersionCheck = metaCacheConfig.GetCacheSkipVersionCheck(); TStringBuf root = AppData(ctx)->PQConfig.GetRoot(); root.SkipPrefix("/"); @@ -113,7 +129,6 @@ public: } private: - void Reset(const TActorContext& ctx, bool error = true) { if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || !AppData(ctx)->PQConfig.GetPQDiscoveryConfig().GetLBFrontEnabled()) { return; @@ -123,7 +138,20 @@ private: LastTopicKey = {}; Type = EQueryType::ECheckVersion; //TODO: on start there will be additional delay for VersionCheckInterval - ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup()); + ctx.Schedule( + error ? QueryRetryInterval : VersionCheckInterval, + new NActors::TEvents::TEvWakeup(static_cast<ui32>(EWakeupTag::WakeForQuery)) + ); + } + + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); + switch (tag) { + case EWakeupTag::WakeForQuery: + return StartQuery(ctx); + case EWakeupTag::WakeForHive: + return ProcessNodesInfoWork(ctx); + } } void SubscribeToClustersUpdate(const TActorContext& ctx) { @@ -151,6 +179,36 @@ private: Reset(ctx); } + void StartHivePipe(const TActorContext& ctx) { + auto hiveTabletId = GetHiveTabletId(ctx); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start pipe to hive tablet: " << hiveTabletId); + auto pipeRetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries(); + pipeRetryPolicy.MaxRetryTime = TDuration::Seconds(1); + NTabletPipe::TClientConfig pipeConfig{pipeRetryPolicy}; + HivePipeClient = ctx.RegisterWithSameMailbox( + NTabletPipe::CreateClient(ctx.SelfID, hiveTabletId, pipeConfig) + ); + } + + void HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + switch (ev->Get()->Status) { + case NKikimrProto::EReplyStatus::OK: + case NKikimrProto::EReplyStatus::ALREADY: + break; + default: + return HandlePipeDestroyed(ctx); + } + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Hive pipe connected"); + ProcessNodesInfoWork(ctx); + } + + void HandlePipeDestroyed(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Hive pipe destroyed"); + NTabletPipe::CloseClient(ctx, HivePipeClient); + HivePipeClient = TActorId(); + StartHivePipe(ctx); + ResetHiveRequestState(ctx); + } void RunQuery(EQueryType type, const TActorContext& ctx) { auto req = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); @@ -369,7 +427,17 @@ private: } }; + enum class EWakeupTag { + WakeForQuery = 1, + WakeForHive = 2 + }; private: + static ui64 GetHiveTabletId(const TActorContext& ctx) { + TDomainsInfo* domainsInfo = AppData(ctx)->DomainsInfo.Get(); + auto hiveTabletId = domainsInfo->GetHive(domainsInfo->GetDefaultHiveUid(domainsInfo->Domains.begin()->first)); + return hiveTabletId; + } + void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); const auto& msg = *ev->Get(); @@ -393,6 +461,11 @@ private: return ProcessDescribeAllTopics(ev->Sender, ctx); } + void HandleGetNodesMapping(TEvPqNewMetaCache::TEvGetNodesMappingRequest::TPtr& ev, const TActorContext& ctx) { + NodesMappingWaiters.emplace(std::move(ev->Sender)); + ProcessNodesInfoWork(ctx); + } + void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) { if (EverGotTopics && CurrentTopics.empty()) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Describe all topics - send empty response"); @@ -529,6 +602,101 @@ private: ctx.Send(recipient, response); } + void ResetHiveRequestState(const TActorContext& ctx) { + if (NextHiveRequestDeadline == TInstant::Zero()) { + NextHiveRequestDeadline = ctx.Now() + TDuration::Seconds(5); + } + RequestedNodesInfo = false; + ctx.Schedule( + TDuration::Seconds(5), + new NActors::TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::WakeForHive)) + ); + } + + void ProcessNodesInfoWork(const TActorContext& ctx) { + if (OnDynNode) { + ProcessNodesInfoWaitersQueue(false, ctx); + return; + } + if (DynamicNodesMapping != nullptr && LastNodesInfoUpdate != TInstant::Zero()) { + const auto nextNodesUpdateTs = LastNodesInfoUpdate + TDuration::MilliSeconds( + AppData(ctx)->PQConfig.GetPQDiscoveryConfig().GetNodesMappingRescanIntervalMilliSeconds() + ); + if (ctx.Now() < nextNodesUpdateTs) + return ProcessNodesInfoWaitersQueue(true, ctx); + } + if (RequestedNodesInfo) + return; + + if (NextHiveRequestDeadline != TInstant::Zero() && ctx.Now() < NextHiveRequestDeadline) { + ResetHiveRequestState(ctx); + return; + } + NextHiveRequestDeadline = ctx.Now() + TDuration::Seconds(5); + RequestedNodesInfo = true; + + NActorsProto::TRemoteHttpInfo info; + auto* param = info.AddQueryParams(); + param->SetKey("page"); + param->SetValue("LandingData"); + info.SetPath("/app"); + NTabletPipe::SendData(ctx, HivePipeClient, new NActors::NMon::TEvRemoteHttpInfo(info)); + } + + void HandleHiveMonResponse(NMon::TEvRemoteJsonInfoRes::TPtr& ev, const TActorContext& ctx) { + ResetHiveRequestState(ctx); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got Hive landing data response: '" << ev->Get()->Json << "'"); + TStringInput input(ev->Get()->Json); + auto jsonValue = NJson::ReadJsonTree(&input, true); + const auto& rootMap = jsonValue.GetMap(); + ui32 aliveNodes = rootMap.find("AliveNodes")->second.GetUInteger(); + if (!aliveNodes) { + return; + } + const auto& nodes = rootMap.find("Nodes")->second.GetArray(); + TSet<ui32> staticNodeIds; + TVector<ui32> dynamicNodes; + ui64 maxStaticNodeId = 0; + for (const auto& node : nodes) { + const auto& nodeMap = node.GetMap(); + ui64 nodeId = nodeMap.find("Id")->second.GetUInteger(); + if (nodeMap.find("Domain")->second.GetString() == "/Root") { + maxStaticNodeId = std::max(maxStaticNodeId, nodeId); + if (nodeMap.find("Alive")->second.GetBoolean() && !nodeMap.find("Down")->second.GetBoolean()) { + staticNodeIds.insert(nodeId); + } + } else { + dynamicNodes.push_back(nodeId); + } + } + if (staticNodeIds.empty()) { + return; + } + DynamicNodesMapping.reset(new THashMap<ui32, ui32>()); + for (auto& dynNodeId : dynamicNodes) { + ui32 hash_ = dynNodeId % (maxStaticNodeId + 1); + auto iter = staticNodeIds.lower_bound(hash_); + DynamicNodesMapping->insert(std::make_pair( + dynNodeId, + iter == staticNodeIds.end() ? *staticNodeIds.begin() : *iter + )); + } + LastNodesInfoUpdate = ctx.Now(); + ProcessNodesInfoWaitersQueue(true, ctx); + } + + void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) { + if (DynamicNodesMapping == nullptr) { + Y_VERIFY(!status); + DynamicNodesMapping.reset(new THashMap<ui32, ui32>); + } + while(!NodesMappingWaiters.empty()) { + ctx.Send(NodesMappingWaiters.front(), + new TEvPqNewMetaCache::TEvGetNodesMappingResponse(DynamicNodesMapping, status)); + NodesMappingWaiters.pop(); + } + } + void StartQuery(const TActorContext& ctx) { if (NewTopicsVersion > CurrentTopicsVersion) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan"); @@ -546,14 +714,19 @@ public: } STRICT_STFUNC(StateFunc, - SFunc(NActors::TEvents::TEvWakeup, StartQuery) + HFunc(NActors::TEvents::TEvWakeup, HandleWakeup) HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdate) HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse); HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse); HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) + HFunc(TEvPqNewMetaCache::TEvGetNodesMappingRequest, HandleGetNodesMapping) HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) + + HFunc(NMon::TEvRemoteJsonInfoRes, HandleHiveMonResponse) + SFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeDestroyed) + HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeConnected) ) private: @@ -579,6 +752,7 @@ private: TInstant LastVersionUpdate = TInstant::Zero(); TQueue<TActorId> ListTopicsWaiters; + TQueue<TActorId> NodesMappingWaiters; THashMap<ui64, std::shared_ptr<TWaiter>> DescribeTopicsWaiters; TQueue<std::shared_ptr<TWaiter>> DescribeAllTopicsWaiters; bool HaveDescribeAllTopicsInflight = false; @@ -594,6 +768,14 @@ private: TString DbRoot; NPersQueue::TConverterFactoryPtr ConverterFactory; + TActorId HivePipeClient; + bool RequestedNodesInfo = false; + TInstant NextHiveRequestDeadline = TInstant::Zero(); + TInstant LastNodesInfoUpdate = TInstant::Now(); + bool OnDynNode = false; + + std::shared_ptr<THashMap<ui32, ui32>> DynamicNodesMapping; + }; IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) { diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index d03bb59c62..023814a97d 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -45,6 +45,8 @@ struct TEvPqNewMetaCache { EvDescribeTopicsResponse, EvDescribeAllTopicsRequest, EvDescribeAllTopicsResponse, + EvGetNodesMappingRequest, + EvGetNodesMappingResponse, EvEnd }; @@ -109,6 +111,21 @@ struct TEvPqNewMetaCache { , Result(result) {} }; + + struct TEvGetNodesMappingRequest : public TEventLocal<TEvGetNodesMappingRequest, EvGetNodesMappingRequest> { + }; + + struct TEvGetNodesMappingResponse : public TEventLocal<TEvGetNodesMappingResponse, EvGetNodesMappingResponse> { + std::shared_ptr<THashMap<ui32, ui32>> NodesMapping; + bool Status; + + TEvGetNodesMappingResponse(const std::shared_ptr<THashMap<ui32, ui32>>& nodesMapping, bool status) + : NodesMapping(std::move(nodesMapping)) + , Status(status) + {} + + }; + }; IActor* CreatePQMetaCache(const ::NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval = TDuration::Seconds(1)); diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp index 3c22aec13c..974eec47eb 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp @@ -415,21 +415,27 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer( bool statusInitializing = false; if (ansIt->second.Get() != nullptr && ansIt->second->Get()->Status == NKikimrProto::OK) { const ui32 nodeId = ansIt->second->Get()->ServerId.NodeId(); - const auto hostName = NodesInfo->HostNames.find(nodeId); - if (hostName != NodesInfo->HostNames.end()) { - location.SetHost(hostName->second); + const auto hostNameIter = NodesInfo->HostNames.find(nodeId); + if (hostNameIter != NodesInfo->HostNames.end()) { + const auto& hostName = hostNameIter->second; + location.SetHost(hostName); + location.SetHostId(nodeId); + location.SetErrorCode(NPersQueue::NErrorCode::OK); - if (nodeId < pqConfig.GetMaxStorageNodeId()) { - location.SetHostId(nodeId); - } else if (pqConfig.GetDynNodePartitionLocationsMapping()) { - auto dynNodeIdIter = NodesInfo->DynToStaticNode.find(nodeId); - Y_VERIFY(!dynNodeIdIter.IsEnd()); - auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second); - Y_VERIFY(!statNodeIdIter.IsEnd()); - location.SetHostId(statNodeIdIter->first); - location.SetHost(statNodeIdIter->second); - } else { - auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second); + if (pqConfig.GetPQDiscoveryConfig().GetUseDynNodesMapping()) { + auto dynNodeIdIter = NodesInfo->DynToStaticNode->find(nodeId); + if (!dynNodeIdIter.IsEnd()) { + auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second); + if (statNodeIdIter.IsEnd()) { + statusInitializing = true; + } else { + location.SetHostId(statNodeIdIter->first); + location.SetHost(statNodeIdIter->second); + } + } + } else if (nodeId > pqConfig.GetMaxStorageNodeId()) { + auto minIter = NodesInfo->MinNodeIdByHost.find(hostName); + // location.SetHost(hostName); - already done before if (minIter.IsEnd()) { location.SetHostId(nodeId); } else { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index bb86acaf7f..9aaf0bb96f 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -173,6 +173,8 @@ message TPQConfig { optional bool LBFrontEnabled = 3 [default = true]; optional bool UseLbAccountAlias = 4 [default = true]; optional string LbUserDatabaseRoot = 5 [default = ""]; + optional bool UseDynNodesMapping = 6 [default = false]; + optional uint64 NodesMappingRescanIntervalMilliSeconds = 7 [default = 10000]; } optional TPQDiscoveryConfig PQDiscoveryConfig = 46; @@ -184,7 +186,6 @@ message TPQConfig { } optional TMoveTopicActorConfig MoveTopicActorConfig = 51; - optional bool DynNodePartitionLocationsMapping = 54 [default = false]; } message TChannelProfile { |