aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-02-07 19:26:17 +0300
committerkomels <komels@ydb.tech>2023-02-07 19:26:17 +0300
commita3f17a0c71ec03358e0fc9a2f094991491a3f831 (patch)
treeef5b736c84f5c4c5cd404a984518ae3748c882e3
parent36035fdcc353a549c237a3ce61eedbca101cfd2d (diff)
downloadydb-a3f17a0c71ec03358e0fc9a2f094991491a3f831.tar.gz
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp82
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h25
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metarequest.cpp13
-rw-r--r--ydb/core/protos/pqconfig.proto3
4 files changed, 108 insertions, 15 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index c06dfbc5ad..0f0437c673 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -214,6 +214,8 @@ void TPersQueueBaseRequestProcessor::Die(const TActorContext& ctx) {
STFUNC(TPersQueueBaseRequestProcessor::StateFunc) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvInterconnect::TEvNodesInfo, Handle);
+ HFunc(TInterconnectProxyTCP::TEvStats, Handle)
+ CFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse, Handle);
HFunc(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse, Handle);
HFunc(TEvPersQueue::TEvResponse, Handle);
@@ -241,7 +243,25 @@ void TPersQueueBaseRequestProcessor::Handle(TEvPersQueue::TEvResponse::TPtr& ev,
void TPersQueueBaseRequestProcessor::Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx) {
Y_VERIFY(ListNodes);
- NodesInfo.reset(new TNodesInfo(ev->Release()));
+ NodesInfo.reset(new TNodesInfo(ev->Release(), ctx));
+ if (ReadyToCreateChildren()) {
+ if (CreateChildren(ctx)) {
+ return;
+ }
+ }
+}
+
+void TPersQueueBaseRequestProcessor::Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx) {
+ NodesInfo->PingReply(ev, ctx);
+ if (ReadyToCreateChildren()) {
+ if (CreateChildren(ctx)) {
+ return;
+ }
+ }
+}
+
+void TPersQueueBaseRequestProcessor::HandleUndelivered(const TActorContext& ctx) {
+ NodesInfo->PingFailed(ctx);
if (ReadyToCreateChildren()) {
if (CreateChildren(ctx)) {
return;
@@ -317,7 +337,7 @@ void TPersQueueBaseRequestProcessor::Handle(
}
bool TPersQueueBaseRequestProcessor::ReadyToCreateChildren() const {
- return TopicsDescription && (!ListNodes || NodesInfo.get() != nullptr);
+ return TopicsDescription && (!ListNodes || (NodesInfo.get() != nullptr && NodesInfo->Ready));
}
bool TPersQueueBaseRequestProcessor::CreateChildren(const TActorContext& ctx) {
@@ -416,16 +436,35 @@ NKikimrClient::TResponse TPersQueueBaseRequestProcessor::MergeSubactorReplies()
return response;
}
-TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply)
+TPersQueueBaseRequestProcessor::TNodesInfo::TNodesInfo(
+ THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx
+)
: NodesInfoReply(std::move(nodesInfoReply))
{
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+ bool useMapping = pqConfig.GetDynNodePartitionLocationsMapping();
HostNames.reserve(NodesInfoReply->Nodes.size());
for (const NActors::TEvInterconnect::TNodeInfo& info : NodesInfoReply->Nodes) {
HostNames.emplace(info.NodeId, info.Host);
- auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId));
- if (!insRes.second) {
- if (insRes.first->second > info.NodeId) {
- insRes.first->second = info.NodeId;
+ if (useMapping) {
+ TActorSystem* const as = ctx.ExecutorThread.ActorSystem;
+ if (info.NodeId > pqConfig.GetMaxStorageNodeId()) {
+ DynNodes.push_back(info.NodeId);
+ } else {
+ ctx.Send(as->InterconnectProxy(info.NodeId), new TInterconnectProxyTCP::TEvQueryStats,
+ IEventHandle::FlagTrackDelivery);
+ ++NodesPingsPending;
+ if (MaxStaticNodeId < info.NodeId) {
+ MaxStaticNodeId = info.NodeId;
+ }
+ }
+ } else {
+ Ready = true;
+ auto insRes = MinNodeIdByHost.insert(std::make_pair(info.Host, info.NodeId));
+ if (!insRes.second) {
+ if (insRes.first->second > info.NodeId) {
+ insRes.first->second = info.NodeId;
+ }
}
}
}
@@ -439,6 +478,35 @@ TTopicInfoBasedActor::TTopicInfoBasedActor(const TSchemeEntry& topicEntry, const
{
}
+void TPersQueueBaseRequestProcessor::TNodesInfo::PingReply(
+ TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx
+) {
+ --NodesPingsPending;
+ if (ev->Get()->ProxyStats.Connected) {
+ StaticNodes.insert(ev->Get()->PeerNodeId);
+ }
+ FinalizeWhenReady(ctx);
+}
+
+void TPersQueueBaseRequestProcessor::TNodesInfo::PingFailed(const TActorContext& ctx) {
+ --NodesPingsPending;
+ FinalizeWhenReady(ctx);
+}
+
+
+void TPersQueueBaseRequestProcessor::TNodesInfo::FinalizeWhenReady(const TActorContext& ctx) {
+ if (NodesPingsPending)
+ Finalize(ctx);
+}
+
+void TPersQueueBaseRequestProcessor::TNodesInfo::Finalize(const TActorContext&) {
+ for (auto& dynNodeId : DynNodes) {
+ ui32 hash_ = dynNodeId % (MaxStaticNodeId + 1);
+ DynToStaticNode[dynNodeId] = *StaticNodes.lower_bound(hash_);
+ }
+ Ready = true;
+}
+
void TTopicInfoBasedActor::Bootstrap(const TActorContext &ctx) {
Become(&TTopicInfoBasedActor::StateFunc);
BootstrapImpl(ctx);
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index a946ab3abb..de5ed1c2a1 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -2,13 +2,13 @@
#include "grpc_server.h"
#include "msgbus_tabletreq.h"
-
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <util/generic/ptr.h>
#include <util/system/compiler.h>
@@ -80,12 +80,24 @@ protected:
};
public:
- struct TNodesInfo {
+ class TNodesInfo {
+ public:
THolder<TEvInterconnect::TEvNodesInfo> NodesInfoReply;
THashMap<ui32, TString> HostNames;
THashMap<TString, ui32> MinNodeIdByHost;
-
- explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply);
+ THashMap<ui32, ui32> DynToStaticNode;
+ bool Ready = false;
+ void PingReply(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx);
+ void PingFailed(const TActorContext& ctx);
+ explicit TNodesInfo(THolder<TEvInterconnect::TEvNodesInfo> nodesInfoReply, const TActorContext& ctx);
+ private:
+ void FinalizeWhenReady(const TActorContext& ctx);
+ void Finalize(const TActorContext& ctx);
+
+ ui64 NodesPingsPending = 0;
+ ui64 MaxStaticNodeId = 0;
+ TVector<ui32> DynNodes;
+ TSet<ui64> StaticNodes;
};
public:
@@ -131,6 +143,8 @@ protected:
virtual STFUNC(StateFunc);
void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TInterconnectProxyTCP::TEvStats::TPtr& ev, const TActorContext& ctx);
+ void HandleUndelivered(const TActorContext& ctx);
void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
@@ -152,7 +166,8 @@ protected:
// Nodes info
const bool ListNodes;
- std::shared_ptr<const TNodesInfo> NodesInfo;
+ std::shared_ptr<TNodesInfo> NodesInfo;
+ ui64 NodesPingsPending = 0;
};
// Helper actor that sends TEvGetBalancerDescribe and checks ACL (ACL is not implemented yet).
diff --git a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
index 9a9d4dd0d0..3c22aec13c 100644
--- a/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metarequest.cpp
@@ -397,8 +397,10 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer(
response.SetErrorCode(code);
if (!errorReason.empty())
response.SetErrorReason(errorReason);
- if (code == NPersQueue::NErrorCode::OK) {
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+
+ if (code == NPersQueue::NErrorCode::OK) {
auto& topicResult = *response.MutableMetaResponse()->MutableCmdGetPartitionLocationsResult()->AddTopicResult();
topicResult.SetTopic(Name);
SetErrorCode(&topicResult, SchemeEntry);
@@ -417,8 +419,15 @@ void TPersQueueGetPartitionLocationsTopicWorker::Answer(
if (hostName != NodesInfo->HostNames.end()) {
location.SetHost(hostName->second);
location.SetErrorCode(NPersQueue::NErrorCode::OK);
- if (nodeId < 1000) {
+ if (nodeId < pqConfig.GetMaxStorageNodeId()) {
location.SetHostId(nodeId);
+ } else if (pqConfig.GetDynNodePartitionLocationsMapping()) {
+ auto dynNodeIdIter = NodesInfo->DynToStaticNode.find(nodeId);
+ Y_VERIFY(!dynNodeIdIter.IsEnd());
+ auto statNodeIdIter = NodesInfo->HostNames.find(dynNodeIdIter->second);
+ Y_VERIFY(!statNodeIdIter.IsEnd());
+ location.SetHostId(statNodeIdIter->first);
+ location.SetHost(statNodeIdIter->second);
} else {
auto minIter = NodesInfo->MinNodeIdByHost.find(hostName->second);
if (minIter.IsEnd()) {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index b7ec68481f..bb86acaf7f 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -177,13 +177,14 @@ message TPQConfig {
optional TPQDiscoveryConfig PQDiscoveryConfig = 46;
optional uint32 MaxStorageNodePort = 50 [default = 19001];
+ optional uint32 MaxStorageNodeId = 53 [default = 999];
message TMoveTopicActorConfig {
repeated string AllowedUserSIDs = 1;
}
optional TMoveTopicActorConfig MoveTopicActorConfig = 51;
-
+ optional bool DynNodePartitionLocationsMapping = 54 [default = false];
}
message TChannelProfile {