diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-07-06 19:23:12 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-07-06 19:23:12 +0300 |
commit | 881a94d5b1395a4d439e6517f933f0253f3d6c80 (patch) | |
tree | 02e8d37b2fd3993e9001ff53b17649308641b286 | |
parent | 8e303d9d16ea628a70ef50c435f225b8c4110641 (diff) | |
download | ydb-881a94d5b1395a4d439e6517f933f0253f3d6c80.tar.gz |
KIKIMR-16187: update TKqpResourceInfoExchangerActor
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp | 66 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 26 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 |
6 files changed, 119 insertions, 16 deletions
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 8efd5f0b86..9c8bee6885 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -761,6 +761,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co RmInternalError = KqpGroup->GetCounter("RM/InternalError", true); RmSnapshotLatency = KqpGroup->GetHistogram( "RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1)); + RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false); /* Spilling */ SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 0d2943e0fb..5862787aaf 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -357,6 +357,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs; ::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError; NMonitoring::THistogramPtr RmSnapshotLatency; + ::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency; // Spilling counters ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs; diff --git a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp index 836c61262d..50873cec8c 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp @@ -28,7 +28,9 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf struct TEvPrivate { enum EEv { EvRetrySending = EventSpaceBegin(TEvents::ES_PRIVATE), + EvRegularSending, EvRetrySubscriber, + EvUpdateSnapshotState, }; struct TEvRetrySending : @@ -39,15 +41,23 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf } }; + struct TEvRegularSending : + public TEventLocal<TEvRegularSending, EEv::EvRegularSending> { + }; + struct TEvRetrySubscriber : public TEventLocal<TEvRetrySubscriber, EEv::EvRetrySubscriber> { }; + + struct TEvUpdateSnapshotState: + public TEventLocal<TEvUpdateSnapshotState, EEv::EvUpdateSnapshotState> { + }; }; struct TRetryState { bool IsScheduled = false; NMonotonic::TMonotonic LastRetryAt = TMonotonic::Zero(); - TDuration CurrentDelay = TDuration::MilliSeconds(10); + TDuration CurrentDelay = TDuration::MilliSeconds(50); }; struct TNodeState { @@ -210,8 +220,19 @@ private: } void UpdateResourceSnapshotState() { + if (ResourceSnapshotRetryState.IsScheduled) { + return; + } + auto now = TlsActivationContext->Monotonic(); + if (now - ResourceSnapshotRetryState.LastRetryAt < ResourceSnapshotRetryState.CurrentDelay) { + auto at = ResourceSnapshotRetryState.LastRetryAt + GetRetryDelay(ResourceSnapshotRetryState); + ResourceSnapshotRetryState.IsScheduled = true; + Schedule(at - now, new TEvPrivate::TEvUpdateSnapshotState); + return; + } TVector<NKikimrKqp::TKqpNodeResources> resources; resources.reserve(NodesState.size()); + for (const auto& [id, state] : NodesState) { auto currentResources = state.NodeData.GetResources(); if (currentResources.GetNodeId()) { @@ -223,6 +244,8 @@ private: ResourceSnapshotState->Snapshot = std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>(std::move(resources)); } + + ResourceSnapshotRetryState.LastRetryAt = now; } void SendInfos(TVector<ui32> infoNodeIds, TVector<ui32> nodeIds = {}) { @@ -238,14 +261,22 @@ private: if (nodesStateIt == NodesState.end()) { return; } - SendingToNode(nodesStateIt->second, false, {}, snapshotMsg); + SendingToNode(nodesStateIt->second, true, {}, snapshotMsg); } } else { auto snapshotMsg = CreateSnapshotMessage(infoNodeIds, false); + TDuration currentLatency = TDuration::MilliSeconds(0); + auto nowMonotic = TlsActivationContext->Monotonic(); + for (auto& [id, state] : NodesState) { - SendingToNode(state, false, {}, snapshotMsg); + SendingToNode(state, true, {}, snapshotMsg); + if (id != SelfId().NodeId()) { + currentLatency = Max(currentLatency, nowMonotic - state.LastUpdateAt); + } } + + Counters->RmMaxSnapshotLatency->Set(currentLatency.MilliSeconds()); } } @@ -301,6 +332,8 @@ private: LOG_I("Received tenant pool status for exchanger, serving tenant: " << BoardState.Tenant << ", board: " << BoardState.Path << ", ssGroupId: " << BoardState.StateStorageGroupId); + + Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending); } @@ -312,6 +345,7 @@ private: CreateSubscriber(); return; } + LOG_D("Get board info from subscriber, serving tenant: " << BoardState.Tenant << ", board: " << BoardState.Path << ", ssGroupId: " << BoardState.StateStorageGroupId @@ -405,6 +439,19 @@ private: PassAway(); } + void Handle(TEvPrivate::TEvUpdateSnapshotState::TPtr&) { + ResourceSnapshotRetryState.IsScheduled = false; + + UpdateResourceSnapshotState(); + } + + void Handle(TEvPrivate::TEvRegularSending::TPtr&) { + SendInfos({SelfId().NodeId()}); + + Schedule(TDuration::Seconds(2), new TEvPrivate::TEvRegularSending); + } + + private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { @@ -415,6 +462,8 @@ private: hFunc(TEvKqpResourceInfoExchanger::TEvSendResources, Handle); hFunc(TEvPrivate::TEvRetrySubscriber, Handle); hFunc(TEvPrivate::TEvRetrySending, Handle); + hFunc(TEvPrivate::TEvRegularSending, Handle); + hFunc(TEvPrivate::TEvUpdateSnapshotState, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(TEvents::TEvPoison, Handle); } @@ -430,16 +479,15 @@ private: } TDuration GetRetryDelay(TRetryState& state) { - auto ret = state.CurrentDelay; auto newDelay = state.CurrentDelay; newDelay *= 2; - if (newDelay > TDuration::Seconds(1)) { - newDelay = TDuration::Seconds(1); + if (newDelay > TDuration::Seconds(2)) { + newDelay = TDuration::Seconds(2); } - newDelay *= AppData()->RandomProvider->Uniform(100, 115); + newDelay *= AppData()->RandomProvider->Uniform(10, 200); newDelay /= 100; state.CurrentDelay = newDelay; - return ret; + return state.CurrentDelay; } void SendingToNode(TNodeState& state, bool retry, @@ -507,6 +555,8 @@ private: }; TBoardState BoardState; + TRetryState ResourceSnapshotRetryState; + THashMap<ui32, TNodeState> NodesState; std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 97d7fadf66..ef6dab2ac6 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -912,6 +912,16 @@ private: void HandleWork(NMon::TEvHttpInfo::TPtr& ev) { TStringStream str; str.Reserve(8 * 1024); + + auto snapshot = TVector<NKikimrKqp::TKqpNodeResources>(); + + if (PublishResourcesByExchanger) { + ResourceManager->RequestClusterResourcesInfo( + [&snapshot](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + snapshot = std::move(resources); + }); + } + HTML(str) { PRE() { str << "Current config:" << Endl; @@ -960,6 +970,35 @@ private: } } // with_lock (bucket.Lock) } + + if (snapshot.empty()) { + str << "No nodes resource info" << Endl; + } else { + str << Endl << "Resources info: " << Endl; + str << "Nodes count: " << snapshot.size() << Endl; + str << Endl; + for(const auto& entry : snapshot) { + str << " NodeId: " << entry.GetNodeId() << Endl; + str << " ResourceManagerActorId: " << entry.GetResourceManagerActorId() << Endl; + str << " AvailableComputeActors: " << entry.GetAvailableComputeActors() << Endl; + str << " UsedMemory: " << entry.GetUsedMemory() << Endl; + str << " TotalMemory: " << entry.GetTotalMemory() << Endl; + str << " Transactions:" << Endl; + for (const auto& tx: entry.GetTransactions()) { + str << " TxId: " << tx.GetTxId() << Endl; + str << " ComputeActors: " << tx.GetComputeActors() << Endl; + str << " Memory: " << tx.GetMemory() << Endl; + str << " StartTimestamp: " << tx.GetStartTimestamp() << Endl; + } + str << " Timestamp: " << entry.GetTimestamp() << Endl; + str << " Memory:" << Endl;; + for (const auto& memoryInfo: entry.GetMemory()) { + str << " Pool: " << memoryInfo.GetPool() << Endl; + str << " Available: " << memoryInfo.GetAvailable() << Endl; + } + str << " ExecutionUnits: " << entry.GetExecutionUnits() << Endl; + } + } } // PRE() } diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index f109e92585..e634bd4e0a 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -224,7 +224,7 @@ public: }); while (ready.load() != 1) { - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(100)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); } if (snapshot.size() != verificationData.size()) { @@ -439,7 +439,7 @@ void KqpRm::Snapshot(bool byExchanger) { AssertResourceManagerStats(rm, 800, 80); AssertResourceBrokerSensors(0, 200, 0, 0, 2); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm); @@ -448,7 +448,7 @@ void KqpRm::Snapshot(bool byExchanger) { AssertResourceManagerStats(rm, 1000, 100); AssertResourceBrokerSensors(0, 0, 0, 2, 0); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm); } @@ -505,6 +505,8 @@ void KqpRm::SnapshotSharing(bool byExchanger) { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); @@ -517,10 +519,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) { bool allocated = rm_first->AllocateResources(1, 2, request); UNIT_ASSERT(allocated); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + allocated = rm_first->AllocateResources(2, 1, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); CheckSnapshot(0, {{800, 80}, {1000, 100}}, rm_second); } @@ -529,10 +533,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) { bool allocated = rm_second->AllocateResources(1, 2, request); UNIT_ASSERT(allocated); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + allocated = rm_second->AllocateResources(2, 1, request); UNIT_ASSERT(allocated); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); CheckSnapshot(1, {{800, 80}, {800, 80}}, rm_first); } @@ -541,7 +547,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) { rm_first->FreeResources(1); rm_first->FreeResources(2); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); CheckSnapshot(0, {{1000, 100}, {800, 80}}, rm_second); } @@ -550,7 +556,7 @@ void KqpRm::SnapshotSharing(bool byExchanger) { rm_second->FreeResources(1); rm_second->FreeResources(2); - Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_first); } @@ -571,6 +577,8 @@ void KqpRm::NodesMembership(bool byExchanger) { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); @@ -583,6 +591,8 @@ void KqpRm::NodesMembership(bool byExchanger) { options.FinalEvents.emplace_back(TEvents::TSystem::Poison, 1); UNIT_ASSERT(Runtime->DispatchEvents(options)); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(4)); + CheckSnapshot(0, {{1000, 100}}, rm_first); } @@ -601,6 +611,8 @@ void KqpRm::DisonnectNodes() { auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); auto rm_second = GetKqpResourceManager(ResourceManagers[1].NodeId()); + Runtime->DispatchEvents(TDispatchOptions(), TDuration::MilliSeconds(500)); + CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm_first); CheckSnapshot(1, {{1000, 100}, {1000, 100}}, rm_second); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5c784ee380..50dc1d313e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1127,7 +1127,7 @@ message TTableServiceConfig { reserved 14; optional TShardsScanningPolicy ShardsScanningPolicy = 16; optional uint64 KqpPatternCacheCapacityBytes = 17 [default = 104857600]; // 100 MiB, 0 is for disable - optional bool EnablePublishResourcesByExchanger = 18 [default = false]; + optional bool EnablePublishResourcesByExchanger = 18 [default = true]; } message TSpillingServiceConfig { |