diff options
author | hor911 <hor911@ydb.tech> | 2023-05-10 14:13:12 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-05-10 14:13:12 +0300 |
commit | 47a1a408ef865d9fa115bcbb3dcb8b652edaffe5 (patch) | |
tree | 714caebaaa4f8b5b64b9677d1df5d491ac01d638 | |
parent | 28ac0a9ef149c86090ba1a27def459365eabb6a9 (diff) | |
download | ydb-47a1a408ef865d9fa115bcbb3dcb8b652edaffe5.tar.gz |
Grace (thread safe) Resource Manager Shutdown
Использование IKqpResourceManager как указателя с имплементацией в виде актора не позволяет управлять его временем жизни. Особенно это проявляется при шатдуне AS. Выношу логику в отдельный (не актор) объект, сам актор держит его через shared_ptr и таким образом гарантирует существование IKqpResourceManager на время жизни актора, но при этом позволяет корректно отработать всем вызовам когда/если актор уже разрушен.
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 431 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 2 |
5 files changed, 234 insertions, 225 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 82e143790c3..74864f8bc67 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -590,7 +590,7 @@ private: Send(executer, ev.Release()); } - NRm::IKqpResourceManager* ResourceManager() { + std::shared_ptr<NRm::IKqpResourceManager> ResourceManager() { if (Y_LIKELY(ResourceManager_)) { return ResourceManager_; } @@ -606,7 +606,7 @@ private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr<TKqpCounters> Counters; IKqpNodeComputeActorFactory* CaFactory; - NRm::IKqpResourceManager* ResourceManager_ = nullptr; + std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_; NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; //state sharded by TxId diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index c68af8de8e0..846d5458ce1 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -105,118 +105,47 @@ struct TTxStatesBucket { constexpr ui64 BucketsCount = 64; -struct TKqpNodeResourceManager { - ui32 NodeId; - IKqpResourceManager* Instance; - - explicit TKqpNodeResourceManager(ui32 nodeId, IKqpResourceManager* instance) - : NodeId(nodeId) - , Instance(instance) {} -}; +struct TEvPrivate { + enum EEv { + EvPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE), + EvSchedulePublishResources, + EvTakeResourcesSnapshot, + }; -struct TResourceManagers { - std::atomic<TKqpNodeResourceManager*> Default = nullptr; + struct TEvPublishResources : public TEventLocal<TEvPublishResources, EEv::EvPublishResources> { + }; - TMutex Lock; - std::unordered_map<ui32, TKqpNodeResourceManager*> ByNodeId; + struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> { + }; - ~TResourceManagers() { - with_lock(Lock) { - for (auto [nodeId, rm] : ByNodeId) { - delete rm; - } - } - } + struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> { + std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback; + }; }; -TResourceManagers ResourceManagers; - -} // namespace - - -class TKqpResourceManagerActor : public IKqpResourceManager, public TActorBootstrapped<TKqpResourceManagerActor> { - using TBase = TActorBootstrapped<TKqpResourceManagerActor>; - - struct TEvPrivate { - enum EEv { - EvPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE), - EvSchedulePublishResources, - EvTakeResourcesSnapshot, - }; - - struct TEvPublishResources : public TEventLocal<TEvPublishResources, EEv::EvPublishResources> { - }; - - struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> { - }; - - struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> { - std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback; - }; - }; +class TKqpResourceManager : public IKqpResourceManager { public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_RESOURCE_MANAGER; - } - TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, - TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId, - std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) + TKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr<TKqpCounters> counters) : Config(config) , Counters(counters) - , ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) , ExecutionUnitsResource(Config.GetComputeActorsCount()) - , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) - , KqpProxySharedResources(std::move(kqpProxySharedResources)) - {} + , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) { - void Bootstrap() { - ActorSystem = TlsActivationContext->ActorSystem(); + } + + void Bootstrap(TActorSystem* actorSystem, TActorId selfId) { if (!Counters) { Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters); } - UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), PatternCache, Counters->GetKqpCounters()); - - LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId); - - // Subscribe for tenant changes - Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe); - - // Subscribe for TableService config changes - ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem; - - Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), - new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}), - IEventHandle::FlagTrackDelivery); - - ToBroker(new TEvResourceBroker::TEvResourceBrokerRequest); - ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)); - - if (auto* mon = AppData()->Mon) { - NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); - mon->RegisterActorPage(actorsMonPage, "kqp_resource_manager", "KQP Resource Manager", false, - ActorSystem, SelfId()); - } - - WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); - - Become(&TKqpResourceManagerActor::WorkState); - - AskSelfNodeInfo(); - SendWhiteboardRequest(); - - auto rm = new TKqpNodeResourceManager(SelfId().NodeId(), this); - with_lock (ResourceManagers.Lock) { - ResourceManagers.ByNodeId[SelfId().NodeId()] = rm; - } - ResourceManagers.Default.store(rm, std::memory_order_release); + ActorSystem = actorSystem; + SelfId = selfId; + UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes()); } -public: bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, - TKqpNotEnoughResources* details) override - { + TKqpNotEnoughResources* details = nullptr) override { if (resources.MemoryPool == EKqpMemoryPool::DataQuery) { NotifyExternalResourcesAllocated(txId, taskId, resources); return true; @@ -295,7 +224,7 @@ public: bool allocated = ResourceBroker->SubmitTaskInstant( TEvResourceBroker::TEvSubmitTask(rbTaskId, rbTaskName, {0, resources.Memory}, "kqp_query", 0, {}), - SelfId()); + SelfId); if (!allocated) { auto unguard = ::Unguard(txBucket.Lock); @@ -333,7 +262,7 @@ public: taskState.ResourceBrokerTaskId = rbTaskId; } else { extraAlloc = true; - bool merged = ResourceBroker->MergeTasksInstant(taskState.ResourceBrokerTaskId, rbTaskId, SelfId()); + bool merged = ResourceBroker->MergeTasksInstant(taskState.ResourceBrokerTaskId, rbTaskId, SelfId); Y_VERIFY(merged); } } // with_lock (txBucket.Lock) @@ -350,36 +279,8 @@ public: return true; } - void SendWhiteboardRequest() { - auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>(); - Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId()); - } - - void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) { - const auto& record = ev->Get()->Record; - if (record.SystemStateInfoSize() != 1) { - LOG_C("Unexpected whiteboard info"); - return; - } - - const auto& info = record.GetSystemStateInfo(0); - if (AppData()->UserPoolId >= info.PoolStatsSize()) { - LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id" - << ", pool size: " << info.PoolStatsSize() - << ", user pool id: " << AppData()->UserPoolId); - return; - } - - const auto& pool = info.GetPoolStats(AppData()->UserPoolId); - - LOG_C("Received node white board pool stats: " << pool.usage()); - ProxyNodeResources.SetCpuUsage(pool.usage()); - ProxyNodeResources.SetThreads(pool.threads()); - } - bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, - TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout) override - { + IKqpResourceManager::TResourcesAllocatedCallback&& onSuccess, IKqpResourceManager::TNotEnoughtResourcesCallback&& onFail, TDuration timeout = {}) override { Y_UNUSED(txId, taskId, resources, onSuccess, onFail, timeout); // TODO: for DataQuery resources only @@ -416,7 +317,7 @@ public: releaseExecutionUnits = taskIt->second.ExecutionUnits; bool finished = ResourceBroker->FinishTaskInstant( - TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId()); + TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId); Y_VERIFY_DEBUG(finished); remainsTasks = txIt->second.Tasks.size() - 1; @@ -469,7 +370,7 @@ public: for (auto& [taskId, taskState] : txIt->second.Tasks) { bool finished = ResourceBroker->FinishTaskInstant( - TEvResourceBroker::TEvFinishTask(taskState.ResourceBrokerTaskId), SelfId()); + TEvResourceBroker::TEvFinishTask(taskState.ResourceBrokerTaskId), SelfId); Y_VERIFY_DEBUG(finished); } @@ -594,7 +495,7 @@ public: LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request"); auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>(); ev->Callback = std::move(callback); - TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId(), SelfId(), ev.Release()); + TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release()); ActorSystem->Send(handle); } @@ -622,6 +523,154 @@ public: } } + ui32 GetNodeId() override { + return SelfId.NodeId(); + } + + TTxStatesBucket& TxBucket(ui64 txId) { + return Buckets[txId % Buckets.size()]; + } + + void FireResourcesPublishing() { + with_lock (Lock) { + if (PublishScheduledAt) { + return; + } + } + + ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources); + } + + void UpdatePatternCache(ui64 size) { + if (size) { + if (!PatternCache || PatternCache->GetMaxSize() != size) { + PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, Counters->GetKqpCounters()); + } + } else { + PatternCache.reset(); + } + } + + TActorId SelfId; + + NKikimrConfig::TTableServiceConfig::TResourceManager Config; // guarded by Lock + TIntrusivePtr<TKqpCounters> Counters; + TIntrusivePtr<NResourceBroker::IResourceBroker> ResourceBroker; + TActorSystem* ActorSystem = nullptr; + + // common guard + TAdaptiveLock Lock; + + // limits (guarded by Lock) + TLimitedResource<ui32> ExecutionUnitsResource; + TLimitedResource<ui64> ScanQueryMemoryResource; + ui64 ExternalDataQueryMemory = 0; + + // current state + std::array<TTxStatesBucket, BucketsCount> Buckets; + std::atomic<ui64> LastResourceBrokerTaskId = 0; + + // schedule info (guarded by Lock) + std::optional<TInstant> PublishScheduledAt; + + // pattern cache for different actors + std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache; +}; + +struct TResourceManagers { + std::weak_ptr<TKqpResourceManager> Default; + + TMutex Lock; + std::unordered_map<ui32, std::weak_ptr<TKqpResourceManager>> ByNodeId; +}; + +TResourceManagers ResourceManagers; + +} // namespace + + +class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerActor> { + using TBase = TActorBootstrapped<TKqpResourceManagerActor>; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_RESOURCE_MANAGER; + } + + TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, + TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId, + std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) + : ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID()) + , KqpProxySharedResources(std::move(kqpProxySharedResources)) + { + ResourceManager = std::make_shared<TKqpResourceManager>(config, counters); + } + + void Bootstrap() { + ResourceManager->Bootstrap(TlsActivationContext->ActorSystem(), SelfId()); + + LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId); + + // Subscribe for tenant changes + Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe); + + // Subscribe for TableService config changes + ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem; + + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), + new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}), + IEventHandle::FlagTrackDelivery); + + ToBroker(new TEvResourceBroker::TEvResourceBrokerRequest); + ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)); + + if (auto* mon = AppData()->Mon) { + NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); + mon->RegisterActorPage(actorsMonPage, "kqp_resource_manager", "KQP Resource Manager", false, + ResourceManager->ActorSystem, SelfId()); + } + + WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); + + Become(&TKqpResourceManagerActor::WorkState); + + AskSelfNodeInfo(); + SendWhiteboardRequest(); + + with_lock (ResourceManagers.Lock) { + ResourceManagers.ByNodeId[SelfId().NodeId()] = ResourceManager; + ResourceManagers.Default = ResourceManager; + } + } + +public: + void SendWhiteboardRequest() { + auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>(); + Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId()); + } + + void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + if (record.SystemStateInfoSize() != 1) { + LOG_C("Unexpected whiteboard info"); + return; + } + + const auto& info = record.GetSystemStateInfo(0); + if (AppData()->UserPoolId >= info.PoolStatsSize()) { + LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id" + << ", pool size: " << info.PoolStatsSize() + << ", user pool id: " << AppData()->UserPoolId); + return; + } + + const auto& pool = info.GetPoolStats(AppData()->UserPoolId); + + LOG_C("Received node white board pool stats: " << pool.usage()); + ProxyNodeResources.SetCpuUsage(pool.usage()); + ProxyNodeResources.SetThreads(pool.threads()); + } + private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { @@ -646,8 +695,8 @@ private: } void HandleWork(TEvPrivate::TEvPublishResources::TPtr&) { - with_lock (Lock) { - PublishScheduledAt.reset(); + with_lock (ResourceManager->Lock) { + ResourceManager->PublishScheduledAt.reset(); } PublishResourceUsage("batching"); @@ -683,16 +732,16 @@ private: auto& queueConfig = *ev->Get()->QueueConfig; if (queueConfig.GetLimit().GetMemory() > 0) { - with_lock (Lock) { - ScanQueryMemoryResource.SetNewLimit(queueConfig.GetLimit().GetMemory()); + with_lock (ResourceManager->Lock) { + ResourceManager->ScanQueryMemoryResource.SetNewLimit(queueConfig.GetLimit().GetMemory()); } LOG_I("Total node memory for scan queries: " << queueConfig.GetLimit().GetMemory() << " bytes"); } } void HandleWork(TEvResourceBroker::TEvResourceBrokerResponse::TPtr& ev) { - with_lock (Lock) { - ResourceBroker = ev->Get()->ResourceBroker; + with_lock (ResourceManager->Lock) { + ResourceManager->ResourceBroker = ev->Get()->ResourceBroker; } } @@ -742,22 +791,12 @@ private: LOG_D("Subscribed for config changes"); } - static void UpdatePatternCache(ui64 size, std::shared_ptr<NMiniKQL::TComputationPatternLRUCache>& cache, NMonitoring::TDynamicCounterPtr counters) { - if (size) { - if (!cache || cache->GetMaxSize() != size) { - cache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, counters); - } - } else { - cache.reset(); - } - } - void HandleWork(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { auto& event = ev->Get()->Record; Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie); auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager(); - UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), PatternCache, Counters->GetKqpCounters()); + ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes()); #define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name()); FORCE_VALUE(ComputeActorsCount) @@ -773,9 +812,9 @@ private: LOG_I("Updated table service config: " << config.DebugString()); - with_lock (Lock) { - ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount()); - Config.Swap(&config); + with_lock (ResourceManager->Lock) { + ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount()); + ResourceManager->Config.Swap(&config); } } @@ -805,24 +844,24 @@ private: HTML(str) { PRE() { str << "Current config:" << Endl; - with_lock (Lock) { - str << Config.DebugString() << Endl; + with_lock (ResourceManager->Lock) { + str << ResourceManager->Config.DebugString() << Endl; } str << "State storage key: " << WbState.Tenant << Endl; - with_lock (Lock) { - str << "ScanQuery memory resource: " << ScanQueryMemoryResource.ToString() << Endl; - str << "External DataQuery memory: " << ExternalDataQueryMemory << Endl; - str << "ExecutionUnits resource: " << ExecutionUnitsResource.ToString() << Endl; + with_lock (ResourceManager->Lock) { + str << "ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource.ToString() << Endl; + str << "External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl; + str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.ToString() << Endl; } - str << "Last resource broker task id: " << LastResourceBrokerTaskId.load() << Endl; + str << "Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId.load() << Endl; if (WbState.LastPublishTime) { str << "Last publish time: " << *WbState.LastPublishTime << Endl; } std::optional<TInstant> publishScheduledAt; - with_lock (Lock) { - publishScheduledAt = PublishScheduledAt; + with_lock (ResourceManager->Lock) { + publishScheduledAt = ResourceManager->PublishScheduledAt; } if (publishScheduledAt) { @@ -830,7 +869,7 @@ private: } str << Endl << "Transactions:" << Endl; - for (auto& bucket : Buckets) { + for (auto& bucket : ResourceManager->Buckets) { with_lock (bucket.Lock) { for (auto& [txId, txState] : bucket.Txs) { str << " TxId: " << txId << Endl; @@ -857,10 +896,6 @@ private: } private: - TTxStatesBucket& TxBucket(ui64 txId) { - return Buckets[txId % Buckets.size()]; - } - void PassAway() override { ToBroker(new TEvResourceBroker::TEvNotifyActorDied); TActor::PassAway(); @@ -874,35 +909,25 @@ private: return TStringBuilder() << "kqprm+" << database; } - void FireResourcesPublishing() { - with_lock (Lock) { - if (PublishScheduledAt) { - return; - } - } - - ActorSystem->Send(SelfId(), new TEvPrivate::TEvSchedulePublishResources); - } - void PublishResourceUsage(TStringBuf reason) { TDuration publishInterval; std::optional<TInstant> publishScheduledAt; - with_lock (Lock) { - publishInterval = TDuration::Seconds(Config.GetPublishStatisticsIntervalSec()); - publishScheduledAt = PublishScheduledAt; + with_lock (ResourceManager->Lock) { + publishInterval = TDuration::Seconds(ResourceManager->Config.GetPublishStatisticsIntervalSec()); + publishScheduledAt = ResourceManager->PublishScheduledAt; } if (publishScheduledAt) { return; } - auto now = ActorSystem->Timestamp(); + auto now = ResourceManager->ActorSystem->Timestamp(); if (publishInterval && WbState.LastPublishTime && now - *WbState.LastPublishTime < publishInterval) { publishScheduledAt = *WbState.LastPublishTime + publishInterval; - with_lock (Lock) { - PublishScheduledAt = publishScheduledAt; + with_lock (ResourceManager->Lock) { + ResourceManager->PublishScheduledAt = publishScheduledAt; } Schedule(*publishScheduledAt - now, new TEvPrivate::TEvPublishResources); @@ -934,15 +959,15 @@ private: } *proxyNodeResources = ProxyNodeResources; ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy - with_lock (Lock) { - payload.SetAvailableComputeActors(ExecutionUnitsResource.Available()); // legacy - payload.SetTotalMemory(ScanQueryMemoryResource.GetLimit()); // legacy - payload.SetUsedMemory(ScanQueryMemoryResource.GetLimit() - ScanQueryMemoryResource.Available()); // legacy + with_lock (ResourceManager->Lock) { + payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy + payload.SetTotalMemory(ResourceManager->ScanQueryMemoryResource.GetLimit()); // legacy + payload.SetUsedMemory(ResourceManager->ScanQueryMemoryResource.GetLimit() - ResourceManager->ScanQueryMemoryResource.Available()); // legacy - payload.SetExecutionUnits(ExecutionUnitsResource.Available()); + payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.Available()); auto* pool = payload.MutableMemory()->Add(); pool->SetPool(EKqpMemoryPool::ScanQuery); - pool->SetAvailable(ScanQueryMemoryResource.Available()); + pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available()); } auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(), @@ -957,26 +982,7 @@ private: } private: - NKikimrConfig::TTableServiceConfig::TResourceManager Config; // guarded by Lock - TIntrusivePtr<TKqpCounters> Counters; const TActorId ResourceBrokerId; - TIntrusivePtr<NResourceBroker::IResourceBroker> ResourceBroker; - TActorSystem* ActorSystem = nullptr; - - // common guard - TAdaptiveLock Lock; - - // limits (guarded by Lock) - TLimitedResource<ui32> ExecutionUnitsResource; - TLimitedResource<ui64> ScanQueryMemoryResource; - ui64 ExternalDataQueryMemory = 0; - - // current state - std::array<TTxStatesBucket, BucketsCount> Buckets; - std::atomic<ui64> LastResourceBrokerTaskId = 0; - - // schedule info (guarded by Lock) - std::optional<TInstant> PublishScheduledAt; // Whiteboard specific fields struct TWhiteBoardState { @@ -988,13 +994,12 @@ private: }; TWhiteBoardState WbState; - // pattern cache for different actors - std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache; - std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources; NKikimrKqp::TKqpProxyNodeResources ProxyNodeResources; TActorId WhiteBoardService; + + std::shared_ptr<TKqpResourceManager> ResourceManager; }; } // namespace NRm @@ -1007,8 +1012,8 @@ NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServic return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources)); } -NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) { - if (auto* rm = TryGetKqpResourceManager(_nodeId)) { +std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> _nodeId) { + if (auto rm = TryGetKqpResourceManager(_nodeId)) { return rm; } @@ -1016,18 +1021,18 @@ NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) { Y_FAIL("KqpResourceManager not ready yet, node #%" PRIu32, nodeId); } -NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> _nodeId) { +std::shared_ptr<NRm::IKqpResourceManager> TryGetKqpResourceManager(TMaybe<ui32> _nodeId) { ui32 nodeId = _nodeId ? *_nodeId : TActivationContext::ActorSystem()->NodeId; - auto rm = NRm::ResourceManagers.Default.load(std::memory_order_acquire); - if (Y_LIKELY(rm && rm->NodeId == nodeId)) { - return rm->Instance; + auto rm = NRm::ResourceManagers.Default.lock(); + if (Y_LIKELY(rm && rm->GetNodeId() == nodeId)) { + return rm; } // for tests only with_lock (NRm::ResourceManagers.Lock) { auto it = NRm::ResourceManagers.ByNodeId.find(nodeId); if (it != NRm::ResourceManagers.ByNodeId.end()) { - return it->second->Instance; + return it->second.lock(); } } diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 49f0d0db904..d77e9072526 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -91,6 +91,10 @@ public: virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetPatternCache() = 0; + + virtual ui32 GetNodeId() { + return 0; + } }; @@ -108,8 +112,8 @@ NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServic TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {}, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr); -NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); -NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); +std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); +std::shared_ptr<NRm::IKqpResourceManager> TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing()); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 9e8b1f3926f..7f77cc9d67a 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -148,7 +148,7 @@ public: UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("InFlyTasks")->Val(), infly); } - void AssertResourceManagerStats(const NRm::IKqpResourceManager* rm, ui64 scanQueryMemory, ui32 executionUnits) { + void AssertResourceManagerStats(std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) { auto stats = rm->GetLocalResources(); UNIT_ASSERT_VALUES_EQUAL(scanQueryMemory, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]); UNIT_ASSERT_VALUES_EQUAL(executionUnits, stats.ExecutionUnits); @@ -184,7 +184,7 @@ void KqpRm::SingleTask() { CreateKqpResourceManager(MakeKqpResourceManagerConfig()); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -206,7 +206,7 @@ void KqpRm::ManyTasks() { CreateKqpResourceManager(MakeKqpResourceManagerConfig()); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -244,7 +244,7 @@ void KqpRm::NotEnoughMemory() { CreateKqpResourceManager(MakeKqpResourceManagerConfig()); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -262,7 +262,7 @@ void KqpRm::NotEnoughExecutionUnits() { CreateKqpResourceManager(MakeKqpResourceManagerConfig()); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 1000; @@ -282,7 +282,7 @@ void KqpRm::ResourceBrokerNotEnoughResources() { CreateKqpResourceManager(config); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; @@ -305,7 +305,7 @@ void KqpRm::Snapshot() { CreateKqpResourceManager(MakeKqpResourceManagerConfig()); NKikimr::TActorSystemStub stub; - auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); NRm::TKqpResourcesRequest request; request.ExecutionUnits = 10; diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 24111f604af..164351e7799 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -1055,7 +1055,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor KqpExecCtx.ApplyCtx = KqpApplyCtx.Get(); KqpExecCtx.Alloc = KqpAlloc.Get(); KqpExecCtx.TypeEnv = KqpTypeEnv.Get(); - if (auto* rm = NKqp::TryGetKqpResourceManager()) { + if (auto rm = NKqp::TryGetKqpResourceManager()) { KqpExecCtx.PatternCache = rm->GetPatternCache(); } } |