diff options
author | shumkovnd <[email protected]> | 2023-07-26 17:42:56 +0300 |
---|---|---|
committer | shumkovnd <[email protected]> | 2023-07-26 17:42:56 +0300 |
commit | 732af3036d7c8921f80eeb7127a8d817611224ab (patch) | |
tree | 048e055b8a9e799de3ad45040048f2d10aa1bd56 | |
parent | 3785d5f97965bccf048718d8717904cf50f9f8f9 (diff) |
KIKIMR-16187: fix gossip
-rw-r--r-- | ydb/core/base/board_lookup.cpp | 41 | ||||
-rw-r--r-- | ydb/core/base/board_publish.cpp | 41 | ||||
-rw-r--r-- | ydb/core/base/statestorage.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 91 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp | 397 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 71 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 27 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 5 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 12 |
13 files changed, 526 insertions, 182 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp index fc1e0f5fb28..c453dc754bb 100644 --- a/ydb/core/base/board_lookup.cpp +++ b/ydb/core/base/board_lookup.cpp @@ -27,6 +27,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { const EBoardLookupMode Mode; const ui32 StateStorageGroupId; const bool Subscriber; + TBoardRetrySettings BoardRetrySettings; static constexpr int MAX_REPLICAS_COUNT_EXP = 32; // Replicas.size() <= 2**MAX_REPLICAS_COUNT_EXP @@ -59,7 +60,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { bool IsScheduled = false; ui32 ReconnectNumber = 0; NMonotonic::TMonotonic LastReconnectAt = TMonotonic::Zero(); - TDuration CurrentDelay = TDuration::MilliSeconds(100); + TDuration CurrentDelay = TDuration::Zero(); }; TVector<TReplica> Replicas; @@ -69,13 +70,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { ui32 WaitForReplicasToSuccess; + const TDuration& GetCurrentDelay(TReplica& replica) { + if (replica.CurrentDelay == TDuration::Zero()) { + replica.CurrentDelay = BoardRetrySettings.StartDelayMs; + } + return replica.CurrentDelay; + } + TDuration GetReconnectDelayForReplica(TReplica& replica) { auto newDelay = replica.CurrentDelay; newDelay *= 2; - if (newDelay > TDuration::Seconds(5)) { - newDelay = TDuration::Seconds(5); + if (newDelay > BoardRetrySettings.MaxDelayMs) { + newDelay = BoardRetrySettings.MaxDelayMs; } - newDelay *= AppData()->RandomProvider->Uniform(100, 115); + newDelay *= AppData()->RandomProvider->Uniform(50, 200); newDelay /= 100; replica.CurrentDelay = newDelay; return replica.CurrentDelay; @@ -402,7 +410,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { CheckCompletion(); } - void ReconnectReplica(ui32 replicaIdx) { + void ReconnectReplica(ui32 replicaIdx, bool fromReconnect = false) { auto& replica = Replicas[replicaIdx]; if (!Subscriber) { @@ -413,12 +421,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } auto now = TlsActivationContext->Monotonic(); - if (now - replica.LastReconnectAt < replica.CurrentDelay) { + if (now - replica.LastReconnectAt < GetCurrentDelay(replica)) { auto at = replica.LastReconnectAt + GetReconnectDelayForReplica(replica); replica.IsScheduled = true; Schedule(at - now, new TEvPrivate::TEvReconnectReplicas(replicaIdx)); return; } + if (!fromReconnect) { + auto delay = TDuration::Seconds(1); + delay *= AppData()->RandomProvider->Uniform(10, 200); + delay /= 100; + replica.IsScheduled = true; + Schedule(delay, new TEvPrivate::TEvReconnectReplicas(replicaIdx)); + return; + } replica.ReconnectNumber++; replica.State = EReplicaState::Reconnect; @@ -433,7 +449,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { void Handle(TEvPrivate::TEvReconnectReplicas::TPtr& ev) { const auto& idx = ev->Get()->ReplicaIdx; Replicas[idx].IsScheduled = false; - ReconnectReplica(idx); + ReconnectReplica(idx, true); } std::pair<ui64, ui64> DecodeCookie(ui64 cookie) { @@ -477,12 +493,15 @@ public: return NKikimrServices::TActivity::BOARD_LOOKUP_ACTOR; } - TBoardLookupActor(const TString &path, TActorId owner, EBoardLookupMode mode, ui32 groupId) + TBoardLookupActor( + const TString &path, TActorId owner, EBoardLookupMode mode, ui32 groupId, + TBoardRetrySettings boardRetrySettings) : Path(path) , Owner(owner) , Mode(mode) , StateStorageGroupId(groupId) , Subscriber(Mode == EBoardLookupMode::Subscription) + , BoardRetrySettings(std::move(boardRetrySettings)) {} void Bootstrap() { @@ -524,8 +543,10 @@ public: } }; -IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode) { - return new TBoardLookupActor(path, owner, mode, groupId); +IActor* CreateBoardLookupActor( + const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, + TBoardRetrySettings boardRetrySettings) { + return new TBoardLookupActor(path, owner, mode, groupId, std::move(boardRetrySettings)); } } diff --git a/ydb/core/base/board_publish.cpp b/ydb/core/base/board_publish.cpp index 8b73b4ad407..c808bb0326c 100644 --- a/ydb/core/base/board_publish.cpp +++ b/ydb/core/base/board_publish.cpp @@ -100,10 +100,11 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> { const ui32 StateStorageGroupId; const ui32 TtlMs; const bool Register; + const TBoardRetrySettings BoardRetrySettings; struct TRetryState { NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero(); - TDuration CurrentDelay = TDuration::MilliSeconds(100); + TDuration CurrentDelay = TDuration::Zero(); }; struct TReplicaPublishActorState { @@ -113,13 +114,20 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> { THashMap<TActorId, TReplicaPublishActorState> ReplicaPublishActors; // replica -> publish actor + const TDuration& GetCurrentDelay(TRetryState& state) { + if (state.CurrentDelay == TDuration::Zero()) { + state.CurrentDelay = BoardRetrySettings.StartDelayMs; + } + return state.CurrentDelay; + } + TDuration GetRetryDelay(TRetryState& state) { auto newDelay = state.CurrentDelay; newDelay *= 2; - if (newDelay > TDuration::Seconds(1)) { - newDelay = TDuration::Seconds(1); + if (newDelay > BoardRetrySettings.MaxDelayMs) { + newDelay = BoardRetrySettings.MaxDelayMs; } - newDelay *= AppData()->RandomProvider->Uniform(10, 200); + newDelay *= AppData()->RandomProvider->Uniform(50, 200); newDelay /= 100; state.CurrentDelay = newDelay; return state.CurrentDelay; @@ -183,7 +191,7 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> { const TActorId replica = ev->Get()->Replica; auto& retryState = ReplicaPublishActors[replica].RetryState; - if (now - retryState.LastRetryAt < retryState.CurrentDelay) { + if (now - retryState.LastRetryAt < GetCurrentDelay(retryState)) { auto at = retryState.LastRetryAt + GetRetryDelay(retryState); Schedule(at - now, new TEvPrivate::TEvRetryPublishActor(replica)); return; @@ -196,11 +204,19 @@ class TBoardPublishActor : public TActorBootstrapped<TBoardPublishActor> { void Handle(TEvPrivate::TEvRetryPublishActor::TPtr &ev) { const auto& replica = ev->Get()->Replica; - RetryReplica(replica); + RetryReplica(replica, true); } - void RetryReplica(const TActorId& replica) { + void RetryReplica(const TActorId& replica, bool fromRetry = false) { + if (!fromRetry) { + auto delay = TDuration::Seconds(2); + delay *= AppData()->RandomProvider->Uniform(50, 200); + delay /= 100; + Schedule(delay, new TEvPrivate::TEvRetryPublishActor(replica)); + return; + } + auto replicaPublishActorsIt = ReplicaPublishActors.find(replica); if (replicaPublishActorsIt == ReplicaPublishActors.end()) { return; @@ -221,13 +237,16 @@ public: return NKikimrServices::TActivity::BOARD_PUBLISH_ACTOR; } - TBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg) + TBoardPublishActor( + const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg, + TBoardRetrySettings boardRetrySettings) : Path(path) , Payload(payload) , Owner(owner) , StateStorageGroupId(groupId) , TtlMs(ttlMs) , Register(reg) + , BoardRetrySettings(std::move(boardRetrySettings)) { Y_UNUSED(TtlMs); Y_UNUSED(Register); @@ -257,8 +276,10 @@ public: } }; -IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg) { - return new TBoardPublishActor(path, payload, owner, groupId, ttlMs, reg); +IActor* CreateBoardPublishActor( + const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg, + TBoardRetrySettings boardRetrySettings) { + return new TBoardPublishActor(path, payload, owner, groupId, ttlMs, reg, std::move(boardRetrySettings)); } TString MakeEndpointsBoardPath(const TString &database) { diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h index 261477786ee..38b6babefd8 100644 --- a/ydb/core/base/statestorage.h +++ b/ydb/core/base/statestorage.h @@ -540,6 +540,11 @@ enum class EBoardLookupMode { Subscription, }; +struct TBoardRetrySettings { + TDuration StartDelayMs = TDuration::MilliSeconds(2000); + TDuration MaxDelayMs = TDuration::MilliSeconds(5000); +}; + TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfo(char (&namePrefix)[TActorId::MaxServiceIDLength], const NKikimrConfig::TDomainsConfig::TStateStorage& config); void BuildStateStorageInfos(const NKikimrConfig::TDomainsConfig::TStateStorage& config, TIntrusivePtr<TStateStorageInfo> &stateStorageInfo, @@ -556,8 +561,12 @@ IActor* CreateStateStorageTabletGuardian(ui64 tabletId, const TActorId &leader, IActor* CreateStateStorageFollowerGuardian(ui64 tabletId, const TActorId &follower); // created as followerCandidate IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &, ui32); IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32); -IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode); -IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg); +IActor* CreateBoardLookupActor( + const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, + TBoardRetrySettings boardRetrySettings = {}); +IActor* CreateBoardPublishActor( + const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg, + TBoardRetrySettings boardRetrySettings = {}); TString MakeEndpointsBoardPath(const TString &database); diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index bec70b79a1e..da37347c841 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -764,6 +764,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co RmSnapshotLatency = KqpGroup->GetHistogram( "RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1)); RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false); + RmNodeNumberInSnapshot = KqpGroup->GetCounter("RM/NodeNumberInSnapshot", false); /* Spilling */ SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 7cfc101f010..ad885cac5f8 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -362,6 +362,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError; NMonitoring::THistogramPtr RmSnapshotLatency; ::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency; + ::NMonitoring::TDynamicCounters::TCounterPtr RmNodeNumberInSnapshot; // Spilling counters ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs; diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index d55ae12d647..e4380cf0621 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -99,6 +99,11 @@ NKikimrConfig::TTableServiceConfig MakeKqpResourceManagerConfig() { config.MutableResourceManager()->SetPublishStatisticsIntervalSec(0); config.MutableResourceManager()->SetEnableInstantMkqlMemoryAlloc(true); + auto* infoExchangerRetrySettings = config.MutableResourceManager()->MutableInfoExchangerSettings(); + auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings(); + exchangerSettings->SetStartDelayMs(10); + exchangerSettings->SetMaxDelayMs(10); + return config; } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 150fa41f327..a6d9d2dc24b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -194,7 +194,7 @@ public: TString endpointAddress; bool useSsl = false; ParseGrpcEndpoint(TokenAccessorConfig.GetEndpoint(), endpointAddress, useSsl); - + CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(endpointAddress, useSsl, caContent, TokenAccessorConfig.GetConnectionPoolSize()); } @@ -363,17 +363,25 @@ public: KQP_PROXY_LOG_D("Received node white board pool stats: " << pool.usage()); NodeResources.SetCpuUsage(pool.usage()); NodeResources.SetThreads(pool.threads()); + + PublishResourceUsage(); } void DoPublishResources() { - SendBoardPublishPoison(); - SendWhiteboardRequest(); + if (AppData()->TenantName.empty() || !SelfDataCenterId) { KQP_PROXY_LOG_E("Cannot start publishing usage, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty")); return; } + SendBoardPublishPoison(); + + if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) { + Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>()); + return; + } + auto groupId = GetDefaultStateStorageGroupId(AppData()->TenantName); if (!groupId) { KQP_PROXY_LOG_D("Unable to determine default state storage group id for database " << @@ -389,22 +397,14 @@ public: } void PublishResourceUsage() { - const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); - auto now = TAppData::TimeProvider->Now(); - TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs()); - - if (TableServiceConfig.GetEnablePublishKqpProxyByRM()) { - if (!LastPublishResourcesAt.has_value() || now - *LastPublishResourcesAt < batchingInterval) { - LastPublishResourcesAt = TAppData::TimeProvider->Now(); - Send(KqpRmServiceActor, std::make_unique<TEvKqp::TEvKqpProxyPublishRequest>()); - } - return; - } - if (ResourcesPublishScheduled) { return; } + const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); + auto now = TAppData::TimeProvider->Now(); + TDuration batchingInterval = TDuration::MilliSeconds(sbs.GetBoardPublishIntervalMs()); + if (LastPublishResourcesAt && now - *LastPublishResourcesAt < batchingInterval) { ResourcesPublishScheduled = true; Schedule(batchingInterval, new TEvPrivate::TEvReadyToPublishResources()); @@ -829,11 +829,14 @@ public: if (!TableServiceConfig.GetEnablePublishKqpProxyByRM()) { LookupPeerProxyData(); } else { - GetKqpResourceManager()->RequestClusterResourcesInfo( - [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); - as->Send(eh); - }); + if (SelfDataCenterId && !AppData()->TenantName.empty() && !IsLookupByRmScheduled) { + IsLookupByRmScheduled = true; + GetKqpResourceManager()->RequestClusterResourcesInfo( + [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); + as->Send(eh); + }); + } } if (!ShutdownRequested) { const auto& sbs = TableServiceConfig.GetSessionBalancerSettings(); @@ -875,33 +878,38 @@ public: } void Handle(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { - if (ev->Get()->Snapshot.empty()) { - PeerProxyNodeResources.clear(); - KQP_PROXY_LOG_E("Can not find default state storage group for database " << - AppData()->TenantName); - return; - } + IsLookupByRmScheduled = false; + + TVector<NKikimrKqp::TKqpProxyNodeResources> proxyResources; + std::vector<ui64> localDatacenterProxies; + proxyResources.reserve(ev->Get()->Snapshot.size()); - Y_VERIFY(SelfDataCenterId); - PeerProxyNodeResources.resize(ev->Get()->Snapshot.size()); - size_t idx = 0; auto getDataCenterId = [](const auto& entry) { return entry.HasDataCenterId() ? entry.GetDataCenterId() : DataCenterToString(entry.GetDataCenterNumId()); }; - LocalDatacenterProxies.clear(); - for(auto& nodeResource : ev->Get()->Snapshot) { - auto* proxyNodeResources = nodeResource.MutableKqpProxyNodeResources(); - proxyNodeResources->SetNodeId(nodeResource.GetNodeId()); - - PeerProxyNodeResources[idx] = std::move(*proxyNodeResources); + for(auto& nodeResources : ev->Get()->Snapshot) { + auto* proxyNodeResources = nodeResources.MutableKqpProxyNodeResources(); - if (getDataCenterId(PeerProxyNodeResources[idx]) == *SelfDataCenterId) { - LocalDatacenterProxies.emplace_back(PeerProxyNodeResources[idx].GetNodeId()); + if (proxyNodeResources->HasNodeId()) { + proxyResources.push_back(std::move(*proxyNodeResources)); + if (getDataCenterId(proxyResources.back()) == *SelfDataCenterId) { + localDatacenterProxies.emplace_back(proxyResources.back().GetNodeId()); + } } - ++idx; } + if (proxyResources.empty()) { + PeerProxyNodeResources.clear(); + KQP_PROXY_LOG_D("Received unexpected data from rm for database " << + AppData()->TenantName); + return; + } + + Y_VERIFY(SelfDataCenterId); + PeerProxyNodeResources = std::move(proxyResources); + LocalDatacenterProxies = std::move(localDatacenterProxies); + PeerStats = CalcPeerStats(PeerProxyNodeResources, *SelfDataCenterId); TryKickSession(); } @@ -1461,13 +1469,13 @@ private: } } - void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { + void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) { Register(CreateListScriptExecutionOperationsActor(std::move(ev))); - } + } } - void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { + void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) { Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); } @@ -1544,6 +1552,7 @@ private: }; EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; std::deque<THolder<IEventHandle>> DelayedEventsQueue; + bool IsLookupByRmScheduled = false; }; } // namespace diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp index 50873cec8ce..cf48f3758c9 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp @@ -2,6 +2,8 @@ #include <ydb/core/base/location.h> #include <ydb/core/base/statestorage.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/cms/console/console.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/mind/tenant_pool.h> #include <ydb/core/kqp/common/kqp_event_ids.h> @@ -25,19 +27,30 @@ namespace NRm { class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInfoExchangerActor> { using TBase = TActorBootstrapped<TKqpResourceInfoExchangerActor>; + static constexpr ui32 BUCKETS_COUNT = 32; + struct TEvPrivate { enum EEv { - EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE), + EvSendToNode = EventSpaceBegin(TEvents::ES_PRIVATE), EvRegularSending, EvRetrySubscriber, EvUpdateSnapshotState, + EvRetryNode, }; - struct TEvRetrySending : - public TEventLocal<TEvRetrySending, EEv::EvRetrySending> { - ui32 NodeId; + struct TEvSendToNode : + public TEventLocal<TEvSendToNode, EEv::EvSendToNode> { + ui32 BucketInd; - TEvRetrySending(ui32 nodeId) : NodeId(nodeId) { + TEvSendToNode(ui32 bucketInd) : BucketInd(bucketInd) { + } + }; + + struct TEvRetryNode : + public TEventLocal<TEvRetryNode, EEv::EvRetryNode> { + ui32 BucketInd; + + TEvRetryNode(ui32 bucketInd) : BucketInd(bucketInd) { } }; @@ -54,31 +67,64 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf }; }; - struct TRetryState { + struct TDelayState { bool IsScheduled = false; NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero(); - TDuration CurrentDelay = TDuration::MilliSeconds(50); + TDuration CurrentDelay = TDuration::Zero(); + }; + + struct TDelaySettings { + TDuration StartDelayMs = TDuration::MilliSeconds(500); + TDuration MaxDelayMs = TDuration::MilliSeconds(2000); }; struct TNodeState { - TRetryState RetryState; NMonotonic::TMonotonic LastUpdateAt = TMonotonic::Zero(); NKikimrKqp::TResourceExchangeNodeData NodeData; }; + struct TBucketState { + TDelayState RetryState; + TDelayState SendState; + THashSet<ui32> NodeIdsToSend; + THashSet<ui32> NodeIdsToRetry; + }; + public: TKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, - std::shared_ptr<TResourceSnapshotState> resourceSnapshotState) + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState, + const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) : ResourceSnapshotState(std::move(resourceSnapshotState)) , Counters(counters) + , Settings(settings) { - Y_UNUSED(counters); + Buckets.resize(BUCKETS_COUNT); + + const auto& publisherSettings = settings.GetPublisherSettings(); + const auto& subscriberSettings = settings.GetSubscriberSettings(); + const auto& exchangerSettings = settings.GetExchangerSettings(); + + UpdateBoardRetrySettings(publisherSettings, PublisherSettings); + UpdateBoardRetrySettings(subscriberSettings, SubscriberSettings); + + if (exchangerSettings.HasStartDelayMs()) { + ExchangerSettings.StartDelayMs = TDuration::MilliSeconds(exchangerSettings.GetStartDelayMs()); + } + if (exchangerSettings.HasMaxDelayMs()) { + ExchangerSettings.MaxDelayMs = TDuration::MilliSeconds(exchangerSettings.GetMaxDelayMs()); + } } void Bootstrap() { LOG_D("Start KqpResourceInfoExchangerActor at " << SelfId()); + ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem; + + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), + new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}), + IEventHandle::FlagTrackDelivery); + Become(&TKqpResourceInfoExchangerActor::WorkState); Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe); @@ -86,6 +132,21 @@ public: private: + bool UpdateBoardRetrySettings( + const NKikimrConfig::TTableServiceConfig::TResourceManager::TRetrySettings& settings, + TBoardRetrySettings& retrySetting) { + bool ret = false; + if (settings.HasStartDelayMs()) { + ret = true; + retrySetting.StartDelayMs = TDuration::MilliSeconds(settings.GetStartDelayMs()); + } + if (settings.HasMaxDelayMs()) { + ret = true; + retrySetting.MaxDelayMs = TDuration::MilliSeconds(settings.GetMaxDelayMs()); + } + return ret; + } + static TString MakeKqpInfoExchangerBoardPath(TStringBuf database) { return TStringBuilder() << "kqpexch+" << database; } @@ -94,8 +155,8 @@ private: auto& retryState = BoardState.RetryStateForSubscriber; auto now = TlsActivationContext->Monotonic(); - if (now - retryState.LastRetryAt < retryState.CurrentDelay) { - auto at = retryState.LastRetryAt + GetRetryDelay(retryState); + if (now - retryState.LastRetryAt < GetCurrentDelay(retryState)) { + auto at = retryState.LastRetryAt + GetDelay(retryState); Schedule(at - now, new TEvPrivate::TEvRetrySubscriber); return; } @@ -108,7 +169,8 @@ private: BoardState.Subscriber = TActorId(); auto subscriber = CreateBoardLookupActor( - BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription); + BoardState.Path, SelfId(), BoardState.StateStorageGroupId, EBoardLookupMode::Subscription, + SubscriberSettings); BoardState.Subscriber = Register(subscriber); retryState.LastRetryAt = now; @@ -123,7 +185,7 @@ private: BoardState.Publisher = TActorId(); auto publisher = CreateBoardPublishActor(BoardState.Path, SelfInfo, SelfId(), - BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true); + BoardState.StateStorageGroupId, /* ttlMs */ 0, /* reg */ true, PublisherSettings); BoardState.Publisher = Register(publisher); auto& nodeState = NodesState[SelfId().NodeId()]; @@ -132,20 +194,29 @@ private: ActorIdToProto(BoardState.Publisher, boardData->MutablePublisher()); } - TVector<ui32> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) { + std::pair<TVector<ui32>, bool> UpdateBoardInfo(const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& infos) { auto nodeIds = TVector<ui32>(); + bool isChanged = false; auto now = TlsActivationContext->Monotonic(); - for (const auto& [id, entry] : infos) { + for (const auto& [publisherId, entry] : infos) { if (entry.Dropped) { - auto nodesStateIt = NodesState.find(id.NodeId()); + auto nodesStateIt = NodesState.find(publisherId.NodeId()); + if (nodesStateIt == NodesState.end()) { continue; } + + auto bucketInd = GetBucketInd(publisherId.NodeId()); + auto& bucket = Buckets[bucketInd]; + auto& currentBoardData = nodesStateIt->second.NodeData.GetResourceExchangeBoardData(); auto currentPublisher = ActorIdFromProto(currentBoardData.GetPublisher()); - if (currentPublisher == id) { + if (currentPublisher == publisherId) { NodesState.erase(nodesStateIt); + isChanged = true; + bucket.NodeIdsToSend.erase(publisherId.NodeId()); + bucket.NodeIdsToRetry.erase(publisherId.NodeId()); } continue; } @@ -160,11 +231,16 @@ private: auto* currentBoardData = currentInfo.MutableResourceExchangeBoardData(); auto currentUlidString = currentBoardData->GetUlid(); + auto bucketInd = GetBucketInd(owner.NodeId()); + auto& bucket = Buckets[bucketInd]; + if (currentUlidString.empty()) { *currentBoardData = std::move(boardData); - ActorIdToProto(id, currentBoardData->MutablePublisher()); + ActorIdToProto(publisherId, currentBoardData->MutablePublisher()); nodeIds.push_back(owner.NodeId()); + isChanged = true; nodeState.LastUpdateAt = now; + bucket.NodeIdsToSend.insert(owner.NodeId()); continue; } @@ -173,19 +249,25 @@ private: if (currentUlid < ulid) { *currentBoardData = std::move(boardData); - ActorIdToProto(id, currentBoardData->MutablePublisher()); + ActorIdToProto(publisherId, currentBoardData->MutablePublisher()); currentInfo.SetRound(0); + isChanged = true; (*currentInfo.MutableResources()) = NKikimrKqp::TKqpNodeResources(); nodeIds.push_back(owner.NodeId()); nodeState.LastUpdateAt = now; + bucket.NodeIdsToSend.insert(owner.NodeId()); continue; } } - return nodeIds; + Counters->RmNodeNumberInSnapshot->Set(NodesState.size()); + + return {nodeIds, isChanged}; } - void UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) { + bool UpdateResourceInfo(const TVector<NKikimrKqp::TResourceExchangeNodeData>& infos) { + bool isChanged = false; + auto now = TlsActivationContext->Monotonic(); for (const auto& info : infos) { @@ -214,9 +296,12 @@ private: auto latency = now - nodeState.LastUpdateAt; Counters->RmSnapshotLatency->Collect(latency.MilliSeconds()); nodeState.LastUpdateAt = now; + isChanged = true; continue; } } + + return isChanged; } void UpdateResourceSnapshotState() { @@ -224,8 +309,8 @@ private: return; } auto now = TlsActivationContext->Monotonic(); - if (now - ResourceSnapshotRetryState.LastRetryAt < ResourceSnapshotRetryState.CurrentDelay) { - auto at = ResourceSnapshotRetryState.LastRetryAt + GetRetryDelay(ResourceSnapshotRetryState); + if (now - ResourceSnapshotRetryState.LastRetryAt < GetCurrentDelay(ResourceSnapshotRetryState)) { + auto at = ResourceSnapshotRetryState.LastRetryAt + GetDelay(ResourceSnapshotRetryState); ResourceSnapshotRetryState.IsScheduled = true; Schedule(at - now, new TEvPrivate::TEvUpdateSnapshotState); return; @@ -233,10 +318,10 @@ private: TVector<NKikimrKqp::TKqpNodeResources> resources; resources.reserve(NodesState.size()); - for (const auto& [id, state] : NodesState) { - auto currentResources = state.NodeData.GetResources(); - if (currentResources.GetNodeId()) { - resources.push_back(currentResources); + for (const auto& [nodeId, state] : NodesState) { + const auto& currentResources = state.NodeData.GetResources(); + if (currentResources.HasNodeId()) { + resources.push_back(std::move(currentResources)); } } @@ -248,40 +333,92 @@ private: ResourceSnapshotRetryState.LastRetryAt = now; } - void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) { + void SendInfos(TVector<ui32> infoNodeIds, bool resending = false, TVector<ui32> nodeIds = {}) { auto& nodeState = NodesState[SelfId().NodeId()]; nodeState.NodeData.SetRound(Round++); if (!nodeIds.empty()) { - auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, true); + auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, resending); for (const auto& nodeId : nodeIds) { auto nodesStateIt = NodesState.find(nodeId); if (nodesStateIt == NodesState.end()) { - return; + continue; + } + + auto owner = ActorIdFromProto(nodesStateIt->second.NodeData.GetResourceExchangeBoardData().GetOwner()); + if (owner != SelfId()) { + auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>(); + msg->Record = snapshotMsg; + Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession); } - SendingToNode(nodesStateIt->second, true, {}, snapshotMsg); } } else { auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false); - TDuration currentLatency = TDuration::MilliSeconds(0); - auto nowMonotic = TlsActivationContext->Monotonic(); - - for (auto& [id, state] : NodesState) { - SendingToNode(state, true, {}, snapshotMsg); - if (id != SelfId().NodeId()) { - currentLatency = Max(currentLatency, nowMonotic - state.LastUpdateAt); + for (size_t idx = 0; idx < BUCKETS_COUNT; idx++) { + auto& bucket = Buckets[idx]; + if (!bucket.NodeIdsToSend.empty()) { + SendingToBucketNodes(idx, bucket, false, {}, snapshotMsg); } } - - Counters->RmMaxSnapshotLatency->Set(currentLatency.MilliSeconds()); } } private: + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { + LOG_D("Subscribed for config changes."); + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + auto &event = ev->Get()->Record; + + NKikimrConfig::TTableServiceConfig tableServiceConfig; + + tableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig()); + LOG_D("Updated table service config."); + + const auto& infoExchangerSettings = tableServiceConfig.GetResourceManager().GetInfoExchangerSettings(); + const auto& publisherSettings = infoExchangerSettings.GetPublisherSettings(); + const auto& subscriberSettings = infoExchangerSettings.GetSubscriberSettings(); + const auto& exchangerSettings = infoExchangerSettings.GetExchangerSettings(); + + if (UpdateBoardRetrySettings(publisherSettings, PublisherSettings)) { + CreatePublisher(); + } + + if (UpdateBoardRetrySettings(subscriberSettings, SubscriberSettings)) { + CreateSubscriber(); + } + + if (exchangerSettings.HasStartDelayMs()) { + ExchangerSettings.StartDelayMs = TDuration::MilliSeconds(exchangerSettings.GetStartDelayMs()); + } + if (exchangerSettings.HasMaxDelayMs()) { + ExchangerSettings.MaxDelayMs = TDuration::MilliSeconds(exchangerSettings.GetMaxDelayMs()); + } + + auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event); + Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + switch (ev->Get()->SourceType) { + case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest: + LOG_C("Failed to deliver subscription request to config dispatcher."); + break; + + case NConsole::TEvConsole::EvConfigNotificationResponse: + LOG_E("Failed to deliver config notification response."); + break; + + default: + break; + } + } + void Handle(TEvTenantPool::TEvTenantPoolStatus::TPtr& ev) { TString tenant; for (auto &slot : ev->Get()->Record.GetSlots()) { @@ -332,8 +469,6 @@ private: LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant << ", board: " << BoardState.Path << ", ssGroupId: " << BoardState.StateStorageGroupId); - - Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending); } @@ -351,11 +486,15 @@ private: << ", ssGroupId: " << BoardState.StateStorageGroupId << ", with size: " << ev->Get()->InfoEntries.size()); - auto nodeIds = UpdateBoardInfo(ev->Get()->InfoEntries); + auto [nodeIds, isChanged] = UpdateBoardInfo(ev->Get()->InfoEntries); - SendInfos({SelfId().NodeId()}, std::move(nodeIds)); + if (!nodeIds.empty()) { + SendInfos({SelfId().NodeId()}, true, std::move(nodeIds)); + } - UpdateResourceSnapshotState(); + if (isChanged) { + UpdateResourceSnapshotState(); + } } void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) { @@ -371,11 +510,15 @@ private: << ", ssGroupId: " << BoardState.StateStorageGroupId << ", with size: " << ev->Get()->Updates.size()); - auto nodeIds = UpdateBoardInfo(ev->Get()->Updates); + auto [nodeIds, isChanged] = UpdateBoardInfo(ev->Get()->Updates); - SendInfos({SelfId().NodeId()}, std::move(nodeIds)); + if (!nodeIds.empty()) { + SendInfos({SelfId().NodeId()}, true, std::move(nodeIds)); + } - UpdateResourceSnapshotState(); + if (isChanged) { + UpdateResourceSnapshotState(); + } } void Handle(TEvKqpResourceInfoExchanger::TEvPublishResource::TPtr& ev) { @@ -395,40 +538,56 @@ private: const TVector<NKikimrKqp::TResourceExchangeNodeData> resourceInfos( ev->Get()->Record.GetSnapshot().begin(), ev->Get()->Record.GetSnapshot().end()); - UpdateResourceInfo(resourceInfos); + bool isChanged = UpdateResourceInfo(resourceInfos); if (ev->Get()->Record.GetNeedResending()) { auto nodesStateIt = NodesState.find(nodeId); - if (nodesStateIt == NodesState.end()) { - return; + if (nodesStateIt != NodesState.end()) { + SendInfos({SelfId().NodeId()}, false, {nodesStateIt->first}); } - SendingToNode(nodesStateIt->second, false, {SelfId().NodeId()}); } - UpdateResourceSnapshotState(); + if (isChanged) { + UpdateResourceSnapshotState(); + } } void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { const auto& nodeId = ev->Get()->NodeId; - auto nodesStateIt = NodesState.find(nodeId); - if (nodesStateIt == NodesState.end()) { + if (NodesState.find(nodeId) == NodesState.end()) { return; } - SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()}); + + auto bucketInd = GetBucketInd(nodeId); + auto& bucket = Buckets[bucketInd]; + + bucket.NodeIdsToRetry.insert(nodeId); + + SendingToBucketNodes(bucketInd, bucket, true, {SelfId().NodeId()}); } - void Handle(TEvPrivate::TEvRetrySending::TPtr& ev) { - const auto& nodeId = ev->Get()->NodeId; - auto nodesStateIt = NodesState.find(nodeId); + void Handle(TEvPrivate::TEvSendToNode::TPtr& ev) { + const auto& bucketInd = ev->Get()->BucketInd; + auto& bucket = Buckets[bucketInd]; - if (nodesStateIt == NodesState.end()) { - return; + bucket.SendState.IsScheduled = false; + + if (!bucket.NodeIdsToSend.empty()) { + SendingToBucketNodes(bucketInd, bucket, false, {SelfId().NodeId()}); } - nodesStateIt->second.RetryState.IsScheduled = false; + } - SendingToNode(nodesStateIt->second, true, {SelfId().NodeId()}); + void Handle(TEvPrivate::TEvRetryNode::TPtr& ev) { + const auto& bucketInd = ev->Get()->BucketInd; + auto& bucket = Buckets[bucketInd]; + + bucket.RetryState.IsScheduled = false; + + if (!bucket.NodeIdsToRetry.empty()) { + SendingToBucketNodes(bucketInd, bucket, true, {SelfId().NodeId()}); + } } void Handle(TEvPrivate::TEvRetrySubscriber::TPtr&) { @@ -456,12 +615,16 @@ private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); + hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle); hFunc(TEvTenantPool::TEvTenantPoolStatus, Handle); hFunc(TEvKqpResourceInfoExchanger::TEvPublishResource, Handle); hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle); hFunc(TEvPrivate::TEvRetrySubscriber, Handle); - hFunc(TEvPrivate::TEvRetrySending, Handle); + hFunc(TEvPrivate::TEvSendToNode, Handle); + hFunc(TEvPrivate::TEvRetryNode, Handle); hFunc(TEvPrivate::TEvRegularSending, Handle); hFunc(TEvPrivate::TEvUpdateSnapshotState, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); @@ -478,47 +641,84 @@ private: TActor::PassAway(); } - TDuration GetRetryDelay(TRetryState& state) { + const TDuration& GetCurrentDelay(TDelayState& state) { + if (state.CurrentDelay == TDuration::Zero()) { + state.CurrentDelay = ExchangerSettings.StartDelayMs; + } + return state.CurrentDelay; + } + + TDuration GetDelay(TDelayState& state) { auto newDelay = state.CurrentDelay; newDelay *= 2; - if (newDelay > TDuration::Seconds(2)) { - newDelay = TDuration::Seconds(2); + if (newDelay > ExchangerSettings.MaxDelayMs) { + newDelay = ExchangerSettings.MaxDelayMs; } - newDelay *= AppData()->RandomProvider->Uniform(10, 200); + newDelay *= AppData()->RandomProvider->Uniform(50, 200); newDelay /= 100; state.CurrentDelay = newDelay; return state.CurrentDelay; } - void SendingToNode(TNodeState& state, bool retry, + ui32 GetBucketInd(ui32 nodeId) { + return nodeId & (BUCKETS_COUNT - 1); + } + + void SendingToBucketNodes(ui32 bucketInd, TBucketState& bucketState, bool retry, TVector<ui32> nodeIdsForSnapshot = {}, NKikimrKqp::TResourceExchangeSnapshot snapshotMsg = {}) { - auto& retryState = state.RetryState; - if (retryState.IsScheduled) { - return; + TDelayState* retryState; + if (retry) { + retryState = &bucketState.RetryState; + } else { + retryState = &bucketState.SendState; } - auto owner = ActorIdFromProto(state.NodeData.GetResourceExchangeBoardData().GetOwner()); - auto nodeId = owner.NodeId(); + if (retryState->IsScheduled) { + return; + } auto now = TlsActivationContext->Monotonic(); - if (retry && now - retryState.LastRetryAt < retryState.CurrentDelay) { - auto at = retryState.LastRetryAt + GetRetryDelay(retryState); - retryState.IsScheduled = true; - Schedule(at - now, new TEvPrivate::TEvRetrySending(nodeId)); + if (now - retryState->LastRetryAt < GetCurrentDelay(*retryState)) { + auto at = retryState->LastRetryAt + GetDelay(*retryState); + retryState->IsScheduled = true; + if (retry) { + Schedule(at - now, new TEvPrivate::TEvRetryNode(bucketInd)); + } else { + Schedule(at - now, new TEvPrivate::TEvSendToNode(bucketInd)); + } return; } - if (owner != SelfId()) { - auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>(); - if (nodeIdsForSnapshot.empty()) { - msg->Record = std::move(snapshotMsg); - } else { - msg->Record = CreateSnapshotMessage(nodeIdsForSnapshot); + if (!nodeIdsForSnapshot.empty()) { + snapshotMsg = CreateSnapshotMessage(nodeIdsForSnapshot); + } + + THashSet<ui32>* nodeIds; + if (retry) { + nodeIds = &bucketState.NodeIdsToRetry; + } else { + nodeIds = &bucketState.NodeIdsToSend; + } + + for (const auto& nodeId: *nodeIds) { + auto nodesStateIt = NodesState.find(nodeId); + if (nodesStateIt == NodesState.end()) { + continue; + } + auto& nodeState = nodesStateIt->second; + auto owner = ActorIdFromProto(nodeState.NodeData.GetResourceExchangeBoardData().GetOwner()); + if (owner != SelfId()) { + auto msg = MakeHolder<TEvKqpResourceInfoExchanger::TEvSendResources>(); + msg->Record = snapshotMsg; + Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession); } - Send(owner, msg.Release(), IEventHandle::FlagSubscribeOnSession); } - retryState.LastRetryAt = now; + if (retry) { + bucketState.NodeIdsToRetry.clear(); + } + + retryState->LastRetryAt = now; } NKikimrKqp::TResourceExchangeSnapshot CreateSnapshotMessage(const TVector<ui32>& nodeIds, @@ -530,7 +730,13 @@ private: for (const auto& nodeId: nodeIds) { auto* el = snapshot->Add(); - *el = NodesState[nodeId].NodeData; + + auto nodesStateIt = NodesState.find(nodeId); + if (nodesStateIt == NodesState.end()) { + continue; + } + + *el = nodesStateIt->second.NodeData; } return snapshotMsg; @@ -550,23 +756,30 @@ private: ui32 StateStorageGroupId = std::numeric_limits<ui32>::max(); TActorId Publisher = TActorId(); TActorId Subscriber = TActorId(); - TRetryState RetryStateForSubscriber; + TDelayState RetryStateForSubscriber; std::optional<TInstant> LastPublishTime; }; TBoardState BoardState; - TRetryState ResourceSnapshotRetryState; + TDelayState ResourceSnapshotRetryState; + TVector<TBucketState> Buckets; THashMap<ui32, TNodeState> NodesState; std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState; + TBoardRetrySettings PublisherSettings; + TBoardRetrySettings SubscriberSettings; + TDelaySettings ExchangerSettings; + TIntrusivePtr<TKqpCounters> Counters; + NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings Settings; }; NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, - std::shared_ptr<TResourceSnapshotState> resourceSnapshotState) + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState, + const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) { - return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState)); + return new TKqpResourceInfoExchangerActor(counters, std::move(resourceSnapshotState), settings); } } // namespace NRm diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index ef6dab2ac6e..73ec85b8336 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -153,8 +153,18 @@ public: UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes()); if (PublishResourcesByExchanger) { + CreateResourceInfoExchanger(Config.GetInfoExchangerSettings()); + return; + } + } + + void CreateResourceInfoExchanger( + const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) { + PublishResourcesByExchanger = true; + if (!ResourceInfoExchanger) { ResourceSnapshotState = std::make_shared<TResourceSnapshotState>(); - auto exchanger = CreateKqpResourceInfoExchangerActor(Counters, ResourceSnapshotState); + auto exchanger = CreateKqpResourceInfoExchangerActor( + Counters, ResourceSnapshotState, settings); ResourceInfoExchanger = ActorSystem->Register(exchanger); return; } @@ -550,11 +560,14 @@ public: void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_AS_D("Schedule Snapshot request"); if (PublishResourcesByExchanger) { - std::shared_ptr<const TVector<NKikimrKqp::TKqpNodeResources>> infos; + std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos; with_lock (ResourceSnapshotState->Lock) { infos = ResourceSnapshotState->Snapshot; } - TVector<NKikimrKqp::TKqpNodeResources> resources = *infos; + TVector<NKikimrKqp::TKqpNodeResources> resources; + if (infos != nullptr) { + resources = *infos; + } callback(std::move(resources)); return; } @@ -778,6 +791,11 @@ private: } void HandleWork(TEvKqp::TEvKqpProxyPublishRequest::TPtr&) { + SendWhiteboardRequest(); + if (AppData()->TenantName.empty() || !SelfDataCenterId) { + LOG_E("Cannot start publishing usage for kqp_proxy, tenants: " << AppData()->TenantName << ", " << SelfDataCenterId.value_or("empty")); + return; + } PublishResourceUsage("kqp_proxy"); } @@ -821,13 +839,14 @@ private: } void Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev) { - auto selfDataCenterId = TString(); + SelfDataCenterId = TString(); if (const auto& node = ev->Get()->Node) { - selfDataCenterId = node->Location.GetDataCenterId(); + SelfDataCenterId = node->Location.GetDataCenterId(); } - ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(selfDataCenterId)); - ProxyNodeResources.SetDataCenterId(selfDataCenterId); + ProxyNodeResources.SetNodeId(SelfId().NodeId()); + ProxyNodeResources.SetDataCenterNumId(DataCenterFromString(*SelfDataCenterId)); + ProxyNodeResources.SetDataCenterId(*SelfDataCenterId); PublishResourceUsage("data_center update"); } @@ -869,6 +888,23 @@ private: auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager(); ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes()); + bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger(); + if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) { + PublishResourcesByExchanger = enablePublishResourcesByExchanger; + if (enablePublishResourcesByExchanger) { + ResourceManager->CreateResourceInfoExchanger(config.GetInfoExchangerSettings()); + PublishResourceUsage("exchanger enabled"); + } else { + if (ResourceManager->ResourceInfoExchanger) { + Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison); + ResourceManager->ResourceInfoExchanger = TActorId(); + } + ResourceManager->PublishResourcesByExchanger = false; + ResourceManager->ResourceSnapshotState.reset(); + PublishResourceUsage("exchanger disabled"); + } + } + #define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name()); FORCE_VALUE(ComputeActorsCount) FORCE_VALUE(ChannelBufferSize) @@ -887,6 +923,7 @@ private: ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount()); ResourceManager->Config.Swap(&config); } + } static void HandleWork(TEvents::TEvUndelivered::TPtr& ev) { @@ -1010,9 +1047,10 @@ private: ToBroker(new TEvResourceBroker::TEvNotifyActorDied); if (ResourceManager->ResourceInfoExchanger) { Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison); + ResourceManager->ResourceInfoExchanger = TActorId(); } + ResourceManager->ResourceSnapshotState.reset(); if (WbState.BoardPublisherActorId) { - Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); } TActor::PassAway(); @@ -1055,13 +1093,17 @@ private: NKikimrKqp::TKqpNodeResources payload; payload.SetNodeId(SelfId().NodeId()); payload.SetTimestamp(now.Seconds()); - auto* proxyNodeResources = payload.MutableKqpProxyNodeResources(); if (KqpProxySharedResources) { - ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load()); + if (SelfDataCenterId) { + auto* proxyNodeResources = payload.MutableKqpProxyNodeResources(); + ProxyNodeResources.SetActiveWorkersCount(KqpProxySharedResources->AtomicLocalSessionCount.load()); + if (SelfDataCenterId) { + *proxyNodeResources = ProxyNodeResources; + } + } } else { LOG_D("Don't set KqpProxySharedResources"); } - *proxyNodeResources = ProxyNodeResources; ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy with_lock (ResourceManager->Lock) { payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy @@ -1079,8 +1121,10 @@ private: << "reason: " << reason << ", payload: " << payload.ShortDebugString()); WbState.LastPublishTime = now; - Send(ResourceManager->ResourceInfoExchanger, - new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload))); + if (ResourceManager->ResourceInfoExchanger) { + Send(ResourceManager->ResourceInfoExchanger, + new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload))); + } return; } @@ -1129,6 +1173,7 @@ private: std::shared_ptr<TKqpResourceManager> ResourceManager; bool PublishResourcesByExchanger; + std::optional<TString> SelfDataCenterId; }; } // namespace NRm diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 2f149098b38..fbcff7151cc 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -125,7 +125,8 @@ struct TEvKqpResourceInfoExchanger { }; NActors::IActor* CreateKqpResourceInfoExchangerActor(TIntrusivePtr<TKqpCounters> counters, - std::shared_ptr<TResourceSnapshotState> resourceSnapshotState); + std::shared_ptr<TResourceSnapshotState> resourceSnapshotState, + const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings); } // namespace NRm diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index e634bd4e0a6..8057cba687b 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -103,6 +103,11 @@ NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfi config.SetQueryMemoryLimit(1000); config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger); + auto* infoExchangerRetrySettings = config.MutableInfoExchangerSettings(); + auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings(); + exchangerSettings->SetStartDelayMs(50); + exchangerSettings->SetMaxDelayMs(50); + return config; } @@ -505,7 +510,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); @@ -519,12 +524,10 @@ void KqpRm::SnapshotSharing(bool byExchanger) { bool allocated = rm_first->AllocateResources(1, 2, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); - allocated = rm_first->AllocateResources(2, 1, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second); } @@ -533,12 +536,10 @@ void KqpRm::SnapshotSharing(bool byExchanger) { bool allocated = rm_second->AllocateResources(1, 2, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); - allocated = rm_second->AllocateResources(2, 1, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first); } @@ -547,7 +548,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) { rm_first->FreeResources(1); rm_first->FreeResources(2); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second); } @@ -556,7 +557,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) { rm_second->FreeResources(1); rm_second->FreeResources(2); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first); } @@ -577,7 +578,7 @@ void KqpRm::NodesMembership(bool byExchanger) { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); @@ -591,7 +592,7 @@ void KqpRm::NodesMembership(bool byExchanger) { options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1); UNIT_ASSERT(Runtime->DispatchEvents(options)); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}}, rm_first); } @@ -611,7 +612,7 @@ void KqpRm::DisonnectNodes() { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); @@ -627,6 +628,8 @@ void KqpRm::DisonnectNodes() { Disconnect(0, 1); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + CheckSnapshot(0, {{1000, 100}}, rm_first); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 910cee52d33..cc852fa465f 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -86,6 +86,11 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { TKikimrSettings() { FeatureFlags.SetForceColumnTablesCompositeMarks(true); + auto* tableServiceConfig = AppConfig.MutableTableServiceConfig(); + auto* infoExchangerRetrySettings = tableServiceConfig->MutableResourceManager()->MutableInfoExchangerSettings(); + auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings(); + exchangerSettings->SetStartDelayMs(10); + exchangerSettings->SetMaxDelayMs(10); } TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index b00ae5d0ce1..d0fabe1b370 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1113,6 +1113,15 @@ message TTableServiceConfig { } message TResourceManager { + message TRetrySettings { + optional uint32 StartDelayMs = 1; + optional uint32 MaxDelayMs = 2; + } + message TInfoExchangerSettings { + optional TRetrySettings PublisherSettings = 1; + optional TRetrySettings SubscriberSettings = 2; + optional TRetrySettings ExchangerSettings = 3; + } optional uint32 ComputeActorsCount = 1 [default = 1000]; optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB reserved 3; @@ -1132,6 +1141,7 @@ message TTableServiceConfig { optional TShardsScanningPolicy ShardsScanningPolicy = 16; optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable optional bool EnablePublishResourcesByExchanger = 18 [default = true]; + optional TInfoExchangerSettings InfoExchangerSettings = 19; } message TSpillingServiceConfig { @@ -1311,7 +1321,7 @@ message TTableServiceConfig { optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; optional TExecuterRetriesConfig ExecuterRetriesConfig = 32; reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; - optional bool EnablePublishKqpProxyByRM = 34 [default = false]; + optional bool EnablePublishKqpProxyByRM = 34 [default = true]; optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false]; optional bool EnablePredicateExtractForScanQueries = 36 [default = true]; optional bool EnablePredicateExtractForDataQueries = 37 [default = true]; |