diff options
author | komels <komels@ydb.tech> | 2023-02-07 19:26:17 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-02-07 19:26:17 +0300 |
commit | a3f17a0c71ec03358e0fc9a2f094991491a3f831 (patch) | |
tree | ef5b736c84f5c4c5cd404a984518ae3748c882e3 | |
parent | 36035fdcc353a549c237a3ce61eedbca101cfd2d (diff) | |
download | ydb-a3f17a0c71ec03358e0fc9a2f094991491a3f831.tar.gz |
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 82 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.h | 25 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metarequest.cpp | 13 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 3 |
4 files changed, 108 insertions, 15 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index c06dfbc5ad..0f0437c673 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -214,6 +214,8 @@ void TPersQueueBaseRequestProcessor::Die(const TActorContext& ctx) { STFUNC(TPersQueueBaseRequestProcessor::StateFunc) { switch (ev->GetTypeRewrite()) { HFunc(TEvInterconnect::TEvNodesInfo, Handle); + HFunc(TInterconnectProxyTCP::TEvStats, Handle) + CFunc(TEvents::TSystem::Undelivered, HandleUndelivered) HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse, Handle); HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse, Handle); HFunc(TEvPersQueue::TEvResponse, Handle); @@ -241,7 +243,25 @@ void TPersQueueBaseRequestProcessor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, void TPersQueueBaseRequestProcessor::Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx) { Y_VERIFY(ListNodes); - NodesInfo.reset(new TNodesInfo(ev->Release())); + NodesInfo.reset(new TNodesInfo(ev->Release(), ctx)); + if (ReadyToCreateChildren()) { + if (CreateChildren(ctx)) { + return; + } + } +} + +void TPersQueueBaseRequestProcessor::Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx) { + NodesInfo->PingReply(ev, ctx); + if (ReadyToCreateChildren()) { + if (CreateChildren(ctx)) { + return; + } + } +} + +void TPersQueueBaseRequestProcessor::HandleUndelivered(const TActorContext& ctx) { + NodesInfo->PingFailed(ctx); if (ReadyToCreateChildren()) { if (CreateChildren(ctx)) { return; @@ -317,7 +337,7 @@ void TPersQueueBaseRequestProcessor::Handle( } bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const { - return TopicsDescription && (!ListNodes || NodesInfo.get() != nullptr); + return TopicsDescription && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready)); } bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) { @@ -416,16 +436,35 @@ NKikimrClient::TResponse TPersQueueBaseRequestProcessor::MergeSubactorReplies() return response; } -TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply) +TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo( + THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx +) : NodesInfoReply(std::move(nodesInfoReply)) { + const auto& pqConfig = AppData(ctx)->PQConfig; + bool useMapping = pqConfig.GetDynNodePartitionLocationsMapping(); HostNames.reserve(NodesInfoReply->Nodes.size()); for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) { HostNames.emplace(info.NodeId, info.Host); - auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId)); - if (!insRes.second) { - if (insRes.first->second > info.NodeId) { - insRes.first->second = info.NodeId; + if (useMapping) { + TActorSystem* const as = ctx.ExecutorThread.ActorSystem; + if (info.NodeId > pqConfig.GetMaxStorageNodeId()) { + DynNodes.push_back(info.NodeId); + } else { + ctx.Send(as->InterconnectProxy(info.NodeId), new TInterconnectProxyTCP::TEvQueryStats, + IEventHandle::FlagTrackDelivery); + ++NodesPingsPending; + if (MaxStaticNodeId < info.NodeId) { + MaxStaticNodeId = info.NodeId; + } + } + } else { + Ready = true; + auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId)); + if (!insRes.second) { + if (insRes.first->second > info.NodeId) { + insRes.first->second = info.NodeId; + } } } } @@ -439,6 +478,35 @@ TTopicInfoBasedActor::TTopicInfoBasedActor(const TSchemeEntry& topicEntry, const { } +void TPersQueueBaseRequestProcessor::TNodesInfo::PingReply( + TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx +) { + --NodesPingsPending; + if (ev->Get()->ProxyStats.Connected) { + StaticNodes.insert(ev->Get()->PeerNodeId); + } + FinalizeWhenReady(ctx); +} + +void TPersQueueBaseRequestProcessor::TNodesInfo::PingFailed(const TActorContext& ctx) { + --NodesPingsPending; + FinalizeWhenReady(ctx); +} + + +void TPersQueueBaseRequestProcessor::TNodesInfo::FinalizeWhenReady(const TActorContext& ctx) { + if (NodesPingsPending) + Finalize(ctx); +} + +void TPersQueueBaseRequestProcessor::TNodesInfo::Finalize(const TActorContext&) { + for (auto& dynNodeId : DynNodes) { + ui32 hash_ = dynNodeId % (MaxStaticNodeId + 1); + DynToStaticNode[dynNodeId] = *StaticNodes.lower_bound(hash_); + } + Ready = true; +} + void TTopicInfoBasedActor::Bootstrap(const TActorContext &ctx) { Become(&TTopicInfoBasedActor::StateFunc); BootstrapImpl(ctx); diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index a946ab3abb..de5ed1c2a1 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -2,13 +2,13 @@ #include "grpc_server.h" #include "msgbus_tabletreq.h" - #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> #include <util/generic/ptr.h> #include <util/system/compiler.h> @@ -80,12 +80,24 @@ protected: }; public: - struct TNodesInfo { + class TNodesInfo { + public: THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply; THashMap<ui32, TString> HostNames; THashMap<TString, ui32> MinNodeIdByHost; - - explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply); + THashMap<ui32, ui32> DynToStaticNode; + bool Ready = false; + void PingReply(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx); + void PingFailed(const TActorContext& ctx); + explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx); + private: + void FinalizeWhenReady(const TActorContext& ctx); + void Finalize(const TActorContext& ctx); + + ui64 NodesPingsPending = 0; + ui64 MaxStaticNodeId = 0; + TVector<ui32> DynNodes; + TSet<ui64> StaticNodes; }; public: @@ -131,6 +143,8 @@ protected: virtual STFUNC(StateFunc); void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx); + void HandleUndelivered(const TActorContext& ctx); void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); @@ -152,7 +166,8 @@ protected: // Nodes info const bool ListNodes; - std::shared_ptr<const TNodesInfo> NodesInfo; + std::shared_ptr<TNodesInfo> NodesInfo; + ui64 NodesPingsPending = 0; }; // Helper actor that sends TEvGetBalancerDescribe and checks ACL (ACL is not implemented yet). diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp index 9a9d4dd0d0..3c22aec13c 100644 --- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp @@ -397,8 +397,10 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer( response.SetErrorCode(code); if (!errorReason.empty()) response.SetErrorReason(errorReason); - if (code == NPersQueue::NErrorCode::OK) { + const auto& pqConfig = AppData(ctx)->PQConfig; + + if (code == NPersQueue::NErrorCode::OK) { auto& topicResult = *response.MutableMetaResponse()->MutableCmdGetPartitionLocationsResult()->AddTopicResult(); topicResult.SetTopic(Name); SetErrorCode(&topicResult, SchemeEntry); @@ -417,8 +419,15 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer( if (hostName != NodesInfo->HostNames.end()) { location.SetHost(hostName->second); location.SetErrorCode(NPersQueue::NErrorCode::OK); - if (nodeId < 1000) { + if (nodeId < pqConfig.GetMaxStorageNodeId()) { location.SetHostId(nodeId); + } else if (pqConfig.GetDynNodePartitionLocationsMapping()) { + auto dynNodeIdIter = NodesInfo->DynToStaticNode.find(nodeId); + Y_VERIFY(!dynNodeIdIter.IsEnd()); + auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second); + Y_VERIFY(!statNodeIdIter.IsEnd()); + location.SetHostId(statNodeIdIter->first); + location.SetHost(statNodeIdIter->second); } else { auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second); if (minIter.IsEnd()) { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index b7ec68481f..bb86acaf7f 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -177,13 +177,14 @@ message TPQConfig { optional TPQDiscoveryConfig PQDiscoveryConfig = 46; optional uint32 MaxStorageNodePort = 50 [default = 19001]; + optional uint32 MaxStorageNodeId = 53 [default = 999]; message TMoveTopicActorConfig { repeated string AllowedUserSIDs = 1; } optional TMoveTopicActorConfig MoveTopicActorConfig = 51; - + optional bool DynNodePartitionLocationsMapping = 54 [default = false]; } message TChannelProfile { |