aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-02-14 14:05:38 +0300
committerkomels <komels@ydb.tech>2023-02-14 14:05:38 +0300
commita01a146b112ce014853d21b4a11daa88a877c892 (patch)
tree5ca6e1976252425f9fab1dd71d3fd31b767b3d9d
parente0b2154fab00a08c07439524ad258e459b7bc4d3 (diff)
downloadydb-a01a146b112ce014853d21b4a11daa88a877c892.tar.gz
- full mapping with hive interaction
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp72
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h16
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp188
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h17
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest.cpp34
-rw-r--r--ydb/core/protos/pqconfig.proto3
6 files changed, 250 insertions, 80 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index 0f0437c673..a7de796c8f 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -214,10 +214,9 @@ void TPersQueueBaseRequestProcessor::Die(const TActorContext& ctx) {
STFUNC(TPersQueueBaseRequestProcessor::StateFunc) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvInterconnect::TEvNodesInfo, Handle);
- HFunc(TInterconnectProxyTCP::TEvStats, Handle)
- CFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse, Handle);
HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse, Handle);
+ HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse, Handle);
HFunc(TEvPersQueue::TEvResponse, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
CFunc(NActors::TEvents::TSystem::PoisonPill, Die);
@@ -251,17 +250,10 @@ void TPersQueueBaseRequestProcessor::Handle(TEvInterconnect::TEvNodesInfo::TPtr&
}
}
-void TPersQueueBaseRequestProcessor::Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx) {
- NodesInfo->PingReply(ev, ctx);
- if (ReadyToCreateChildren()) {
- if (CreateChildren(ctx)) {
- return;
- }
- }
-}
-
-void TPersQueueBaseRequestProcessor::HandleUndelivered(const TActorContext& ctx) {
- NodesInfo->PingFailed(ctx);
+void TPersQueueBaseRequestProcessor::Handle(
+ TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext& ctx
+) {
+ NodesInfo->ProcessNodesMapping(ev, ctx);
if (ReadyToCreateChildren()) {
if (CreateChildren(ctx)) {
return;
@@ -337,10 +329,14 @@ void TPersQueueBaseRequestProcessor::Handle(
}
bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const {
- return TopicsDescription && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready));
+ return TopicsDescription
+ && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready));
}
bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) {
+ if (ChildrenCreationDone)
+ return false;
+ ChildrenCreationDone = true;
Y_VERIFY(TopicsDescription->ResultSet.size() == TopicsConverters.size());
ui32 i = 0;
for (const auto& entry : TopicsDescription->ResultSet) {
@@ -379,6 +375,7 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext&
NeedChildrenCreation = false;
THashSet<TString> topics;
+
while (!ChildrenToCreate.empty()) {
THolder<TPerTopicInfo> perTopicInfo(ChildrenToCreate.front().Release());
ChildrenToCreate.pop_front();
@@ -402,7 +399,6 @@ bool TPersQueueBaseRequestProcessor::CreateChildrenIfNeeded(const TActorContext&
Children.emplace(actorId, std::move(perTopicInfo));
}
}
-
Y_VERIFY(topics.size() == Children.size());
if (!TopicsToRequest.empty() && TopicsToRequest.size() != topics.size()) {
@@ -442,22 +438,15 @@ TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(
: NodesInfoReply(std::move(nodesInfoReply))
{
const auto& pqConfig = AppData(ctx)->PQConfig;
- bool useMapping = pqConfig.GetDynNodePartitionLocationsMapping();
+ bool useMapping = pqConfig.GetPQDiscoveryConfig().GetUseDynNodesMapping();
HostNames.reserve(NodesInfoReply->Nodes.size());
for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) {
HostNames.emplace(info.NodeId, info.Host);
if (useMapping) {
- TActorSystem* const as = ctx.ExecutorThread.ActorSystem;
- if (info.NodeId > pqConfig.GetMaxStorageNodeId()) {
- DynNodes.push_back(info.NodeId);
- } else {
- ctx.Send(as->InterconnectProxy(info.NodeId), new TInterconnectProxyTCP::TEvQueryStats,
- IEventHandle::FlagTrackDelivery);
- ++NodesPingsPending;
- if (MaxStaticNodeId < info.NodeId) {
- MaxStaticNodeId = info.NodeId;
- }
- }
+ ctx.Send(
+ CreatePersQueueMetaCacheV2Id(),
+ new TEvPqNewMetaCache::TEvGetNodesMappingRequest()
+ );
} else {
Ready = true;
auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId));
@@ -478,35 +467,14 @@ TTopicInfoBasedActor::TTopicInfoBasedActor(const TSchemeEntry& topicEntry, const
{
}
-void TPersQueueBaseRequestProcessor::TNodesInfo::PingReply(
- TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx
+void TPersQueueBaseRequestProcessor::TNodesInfo::ProcessNodesMapping(
+ TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext&
) {
- --NodesPingsPending;
- if (ev->Get()->ProxyStats.Connected) {
- StaticNodes.insert(ev->Get()->PeerNodeId);
- }
- FinalizeWhenReady(ctx);
-}
-
-void TPersQueueBaseRequestProcessor::TNodesInfo::PingFailed(const TActorContext& ctx) {
- --NodesPingsPending;
- FinalizeWhenReady(ctx);
-}
-
-
-void TPersQueueBaseRequestProcessor::TNodesInfo::FinalizeWhenReady(const TActorContext& ctx) {
- if (NodesPingsPending)
- Finalize(ctx);
-}
-
-void TPersQueueBaseRequestProcessor::TNodesInfo::Finalize(const TActorContext&) {
- for (auto& dynNodeId : DynNodes) {
- ui32 hash_ = dynNodeId % (MaxStaticNodeId + 1);
- DynToStaticNode[dynNodeId] = *StaticNodes.lower_bound(hash_);
- }
+ DynToStaticNode = std::move(ev->Get()->NodesMapping);
Ready = true;
}
+
void TTopicInfoBasedActor::Bootstrap(const TActorContext &ctx) {
Become(&TTopicInfoBasedActor::StateFunc);
BootstrapImpl(ctx);
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index de5ed1c2a1..720527078a 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -85,19 +85,15 @@ public:
THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply;
THashMap<ui32, TString> HostNames;
THashMap<TString, ui32> MinNodeIdByHost;
- THashMap<ui32, ui32> DynToStaticNode;
+ std::shared_ptr<THashMap<ui32, ui32>> DynToStaticNode;
+
bool Ready = false;
- void PingReply(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx);
- void PingFailed(const TActorContext& ctx);
+ void ProcessNodesMapping(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev,
+ const TActorContext& ctx);
explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx);
private:
void FinalizeWhenReady(const TActorContext& ctx);
void Finalize(const TActorContext& ctx);
-
- ui64 NodesPingsPending = 0;
- ui64 MaxStaticNodeId = 0;
- TVector<ui32> DynNodes;
- TSet<ui64> StaticNodes;
};
public:
@@ -107,6 +103,7 @@ public:
bool NeedChildrenCreation = false;
ui32 ChildrenCreated = 0;
+ bool ChildrenCreationDone = false;
std::deque<THolder<TPerTopicInfo>> ChildrenToCreate;
@@ -143,8 +140,7 @@ protected:
virtual STFUNC(StateFunc);
void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx);
- void Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx);
- void HandleUndelivered(const TActorContext& ctx);
+ void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index ff0501fb45..91e7e7dac7 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -14,6 +14,9 @@
#include <ydb/core/persqueue/pq_database.h>
#include <ydb/core/persqueue/cluster_tracker.h>
+#include <library/cpp/actors/protos/actors.pb.h>
+#include <library/cpp/actors/core/mon.h>
+#include <library/cpp/json/json_reader.h>
namespace NKikimr::NMsgBusProxy {
@@ -94,6 +97,19 @@ public:
if (!metaCacheConfig.GetLBFrontEnabled()) {
return;
}
+ if (metaCacheConfig.GetUseDynNodesMapping()) {
+ TStringBuf tenant = AppData(ctx)->TenantName;
+ tenant.SkipPrefix("/");
+ tenant.ChopSuffix("/");
+ if (tenant != "Root") {
+ LOG_NOTICE_S(ctx, NKikimrServices::PQ_METACACHE, "Started on tenant = '" << tenant << "', will not request hive");
+ OnDynNode = true;
+ } else {
+ StartHivePipe(ctx);
+ ProcessNodesInfoWork(ctx);
+ }
+ }
+
SkipVersionCheck = metaCacheConfig.GetCacheSkipVersionCheck();
TStringBuf root = AppData(ctx)->PQConfig.GetRoot();
root.SkipPrefix("/");
@@ -113,7 +129,6 @@ public:
}
private:
-
void Reset(const TActorContext& ctx, bool error = true) {
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || !AppData(ctx)->PQConfig.GetPQDiscoveryConfig().GetLBFrontEnabled()) {
return;
@@ -123,7 +138,20 @@ private:
LastTopicKey = {};
Type = EQueryType::ECheckVersion;
//TODO: on start there will be additional delay for VersionCheckInterval
- ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup());
+ ctx.Schedule(
+ error ? QueryRetryInterval : VersionCheckInterval,
+ new NActors::TEvents::TEvWakeup(static_cast<ui32>(EWakeupTag::WakeForQuery))
+ );
+ }
+
+ void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
+ switch (tag) {
+ case EWakeupTag::WakeForQuery:
+ return StartQuery(ctx);
+ case EWakeupTag::WakeForHive:
+ return ProcessNodesInfoWork(ctx);
+ }
}
void SubscribeToClustersUpdate(const TActorContext& ctx) {
@@ -151,6 +179,36 @@ private:
Reset(ctx);
}
+ void StartHivePipe(const TActorContext& ctx) {
+ auto hiveTabletId = GetHiveTabletId(ctx);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start pipe to hive tablet: " << hiveTabletId);
+ auto pipeRetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries();
+ pipeRetryPolicy.MaxRetryTime = TDuration::Seconds(1);
+ NTabletPipe::TClientConfig pipeConfig{pipeRetryPolicy};
+ HivePipeClient = ctx.RegisterWithSameMailbox(
+ NTabletPipe::CreateClient(ctx.SelfID, hiveTabletId, pipeConfig)
+ );
+ }
+
+ void HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ switch (ev->Get()->Status) {
+ case NKikimrProto::EReplyStatus::OK:
+ case NKikimrProto::EReplyStatus::ALREADY:
+ break;
+ default:
+ return HandlePipeDestroyed(ctx);
+ }
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Hive pipe connected");
+ ProcessNodesInfoWork(ctx);
+ }
+
+ void HandlePipeDestroyed(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Hive pipe destroyed");
+ NTabletPipe::CloseClient(ctx, HivePipeClient);
+ HivePipeClient = TActorId();
+ StartHivePipe(ctx);
+ ResetHiveRequestState(ctx);
+ }
void RunQuery(EQueryType type, const TActorContext& ctx) {
auto req = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -369,7 +427,17 @@ private:
}
};
+ enum class EWakeupTag {
+ WakeForQuery = 1,
+ WakeForHive = 2
+ };
private:
+ static ui64 GetHiveTabletId(const TActorContext& ctx) {
+ TDomainsInfo* domainsInfo = AppData(ctx)->DomainsInfo.Get();
+ auto hiveTabletId = domainsInfo->GetHive(domainsInfo->GetDefaultHiveUid(domainsInfo->Domains.begin()->first));
+ return hiveTabletId;
+ }
+
void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics");
const auto& msg = *ev->Get();
@@ -393,6 +461,11 @@ private:
return ProcessDescribeAllTopics(ev->Sender, ctx);
}
+ void HandleGetNodesMapping(TEvPqNewMetaCache::TEvGetNodesMappingRequest::TPtr& ev, const TActorContext& ctx) {
+ NodesMappingWaiters.emplace(std::move(ev->Sender));
+ ProcessNodesInfoWork(ctx);
+ }
+
void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) {
if (EverGotTopics && CurrentTopics.empty()) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Describe all topics - send empty response");
@@ -529,6 +602,101 @@ private:
ctx.Send(recipient, response);
}
+ void ResetHiveRequestState(const TActorContext& ctx) {
+ if (NextHiveRequestDeadline == TInstant::Zero()) {
+ NextHiveRequestDeadline = ctx.Now() + TDuration::Seconds(5);
+ }
+ RequestedNodesInfo = false;
+ ctx.Schedule(
+ TDuration::Seconds(5),
+ new NActors::TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::WakeForHive))
+ );
+ }
+
+ void ProcessNodesInfoWork(const TActorContext& ctx) {
+ if (OnDynNode) {
+ ProcessNodesInfoWaitersQueue(false, ctx);
+ return;
+ }
+ if (DynamicNodesMapping != nullptr && LastNodesInfoUpdate != TInstant::Zero()) {
+ const auto nextNodesUpdateTs = LastNodesInfoUpdate + TDuration::MilliSeconds(
+ AppData(ctx)->PQConfig.GetPQDiscoveryConfig().GetNodesMappingRescanIntervalMilliSeconds()
+ );
+ if (ctx.Now() < nextNodesUpdateTs)
+ return ProcessNodesInfoWaitersQueue(true, ctx);
+ }
+ if (RequestedNodesInfo)
+ return;
+
+ if (NextHiveRequestDeadline != TInstant::Zero() && ctx.Now() < NextHiveRequestDeadline) {
+ ResetHiveRequestState(ctx);
+ return;
+ }
+ NextHiveRequestDeadline = ctx.Now() + TDuration::Seconds(5);
+ RequestedNodesInfo = true;
+
+ NActorsProto::TRemoteHttpInfo info;
+ auto* param = info.AddQueryParams();
+ param->SetKey("page");
+ param->SetValue("LandingData");
+ info.SetPath("/app");
+ NTabletPipe::SendData(ctx, HivePipeClient, new NActors::NMon::TEvRemoteHttpInfo(info));
+ }
+
+ void HandleHiveMonResponse(NMon::TEvRemoteJsonInfoRes::TPtr& ev, const TActorContext& ctx) {
+ ResetHiveRequestState(ctx);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got Hive landing data response: '" << ev->Get()->Json << "'");
+ TStringInput input(ev->Get()->Json);
+ auto jsonValue = NJson::ReadJsonTree(&input, true);
+ const auto& rootMap = jsonValue.GetMap();
+ ui32 aliveNodes = rootMap.find("AliveNodes")->second.GetUInteger();
+ if (!aliveNodes) {
+ return;
+ }
+ const auto& nodes = rootMap.find("Nodes")->second.GetArray();
+ TSet<ui32> staticNodeIds;
+ TVector<ui32> dynamicNodes;
+ ui64 maxStaticNodeId = 0;
+ for (const auto& node : nodes) {
+ const auto& nodeMap = node.GetMap();
+ ui64 nodeId = nodeMap.find("Id")->second.GetUInteger();
+ if (nodeMap.find("Domain")->second.GetString() == "/Root") {
+ maxStaticNodeId = std::max(maxStaticNodeId, nodeId);
+ if (nodeMap.find("Alive")->second.GetBoolean() && !nodeMap.find("Down")->second.GetBoolean()) {
+ staticNodeIds.insert(nodeId);
+ }
+ } else {
+ dynamicNodes.push_back(nodeId);
+ }
+ }
+ if (staticNodeIds.empty()) {
+ return;
+ }
+ DynamicNodesMapping.reset(new THashMap<ui32, ui32>());
+ for (auto& dynNodeId : dynamicNodes) {
+ ui32 hash_ = dynNodeId % (maxStaticNodeId + 1);
+ auto iter = staticNodeIds.lower_bound(hash_);
+ DynamicNodesMapping->insert(std::make_pair(
+ dynNodeId,
+ iter == staticNodeIds.end() ? *staticNodeIds.begin() : *iter
+ ));
+ }
+ LastNodesInfoUpdate = ctx.Now();
+ ProcessNodesInfoWaitersQueue(true, ctx);
+ }
+
+ void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
+ if (DynamicNodesMapping == nullptr) {
+ Y_VERIFY(!status);
+ DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
+ }
+ while(!NodesMappingWaiters.empty()) {
+ ctx.Send(NodesMappingWaiters.front(),
+ new TEvPqNewMetaCache::TEvGetNodesMappingResponse(DynamicNodesMapping, status));
+ NodesMappingWaiters.pop();
+ }
+ }
+
void StartQuery(const TActorContext& ctx) {
if (NewTopicsVersion > CurrentTopicsVersion) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan");
@@ -546,14 +714,19 @@ public:
}
STRICT_STFUNC(StateFunc,
- SFunc(NActors::TEvents::TEvWakeup, StartQuery)
+ HFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdate)
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse);
HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
+ HFunc(TEvPqNewMetaCache::TEvGetNodesMappingRequest, HandleGetNodesMapping)
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
+
+ HFunc(NMon::TEvRemoteJsonInfoRes, HandleHiveMonResponse)
+ SFunc(TEvTabletPipe::TEvClientDestroyed, HandlePipeDestroyed)
+ HFunc(TEvTabletPipe::TEvClientConnected, HandlePipeConnected)
)
private:
@@ -579,6 +752,7 @@ private:
TInstant LastVersionUpdate = TInstant::Zero();
TQueue<TActorId> ListTopicsWaiters;
+ TQueue<TActorId> NodesMappingWaiters;
THashMap<ui64, std::shared_ptr<TWaiter>> DescribeTopicsWaiters;
TQueue<std::shared_ptr<TWaiter>> DescribeAllTopicsWaiters;
bool HaveDescribeAllTopicsInflight = false;
@@ -594,6 +768,14 @@ private:
TString DbRoot;
NPersQueue::TConverterFactoryPtr ConverterFactory;
+ TActorId HivePipeClient;
+ bool RequestedNodesInfo = false;
+ TInstant NextHiveRequestDeadline = TInstant::Zero();
+ TInstant LastNodesInfoUpdate = TInstant::Now();
+ bool OnDynNode = false;
+
+ std::shared_ptr<THashMap<ui32, ui32>> DynamicNodesMapping;
+
};
IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) {
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index d03bb59c62..023814a97d 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -45,6 +45,8 @@ struct TEvPqNewMetaCache {
EvDescribeTopicsResponse,
EvDescribeAllTopicsRequest,
EvDescribeAllTopicsResponse,
+ EvGetNodesMappingRequest,
+ EvGetNodesMappingResponse,
EvEnd
};
@@ -109,6 +111,21 @@ struct TEvPqNewMetaCache {
, Result(result)
{}
};
+
+ struct TEvGetNodesMappingRequest : public TEventLocal<TEvGetNodesMappingRequest, EvGetNodesMappingRequest> {
+ };
+
+ struct TEvGetNodesMappingResponse : public TEventLocal<TEvGetNodesMappingResponse, EvGetNodesMappingResponse> {
+ std::shared_ptr<THashMap<ui32, ui32>> NodesMapping;
+ bool Status;
+
+ TEvGetNodesMappingResponse(const std::shared_ptr<THashMap<ui32, ui32>>& nodesMapping, bool status)
+ : NodesMapping(std::move(nodesMapping))
+ , Status(status)
+ {}
+
+ };
+
};
IActor* CreatePQMetaCache(const ::NMonitoring::TDynamicCounterPtr& counters,
const TDuration& versionCheckInterval = TDuration::Seconds(1));
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
index 3c22aec13c..974eec47eb 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
@@ -415,21 +415,27 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer(
bool statusInitializing = false;
if (ansIt->second.Get() != nullptr && ansIt->second->Get()->Status == NKikimrProto::OK) {
const ui32 nodeId = ansIt->second->Get()->ServerId.NodeId();
- const auto hostName = NodesInfo->HostNames.find(nodeId);
- if (hostName != NodesInfo->HostNames.end()) {
- location.SetHost(hostName->second);
+ const auto hostNameIter = NodesInfo->HostNames.find(nodeId);
+ if (hostNameIter != NodesInfo->HostNames.end()) {
+ const auto& hostName = hostNameIter->second;
+ location.SetHost(hostName);
+ location.SetHostId(nodeId);
+
location.SetErrorCode(NPersQueue::NErrorCode::OK);
- if (nodeId < pqConfig.GetMaxStorageNodeId()) {
- location.SetHostId(nodeId);
- } else if (pqConfig.GetDynNodePartitionLocationsMapping()) {
- auto dynNodeIdIter = NodesInfo->DynToStaticNode.find(nodeId);
- Y_VERIFY(!dynNodeIdIter.IsEnd());
- auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second);
- Y_VERIFY(!statNodeIdIter.IsEnd());
- location.SetHostId(statNodeIdIter->first);
- location.SetHost(statNodeIdIter->second);
- } else {
- auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second);
+ if (pqConfig.GetPQDiscoveryConfig().GetUseDynNodesMapping()) {
+ auto dynNodeIdIter = NodesInfo->DynToStaticNode->find(nodeId);
+ if (!dynNodeIdIter.IsEnd()) {
+ auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second);
+ if (statNodeIdIter.IsEnd()) {
+ statusInitializing = true;
+ } else {
+ location.SetHostId(statNodeIdIter->first);
+ location.SetHost(statNodeIdIter->second);
+ }
+ }
+ } else if (nodeId > pqConfig.GetMaxStorageNodeId()) {
+ auto minIter = NodesInfo->MinNodeIdByHost.find(hostName);
+ // location.SetHost(hostName); - already done before
if (minIter.IsEnd()) {
location.SetHostId(nodeId);
} else {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index bb86acaf7f..9aaf0bb96f 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -173,6 +173,8 @@ message TPQConfig {
optional bool LBFrontEnabled = 3 [default = true];
optional bool UseLbAccountAlias = 4 [default = true];
optional string LbUserDatabaseRoot = 5 [default = ""];
+ optional bool UseDynNodesMapping = 6 [default = false];
+ optional uint64 NodesMappingRescanIntervalMilliSeconds = 7 [default = 10000];
}
optional TPQDiscoveryConfig PQDiscoveryConfig = 46;
@@ -184,7 +186,6 @@ message TPQConfig {
}
optional TMoveTopicActorConfig MoveTopicActorConfig = 51;
- optional bool DynNodePartitionLocationsMapping = 54 [default = false];
}
message TChannelProfile {