aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-05-10 14:13:12 +0300
committerhor911 <hor911@ydb.tech>2023-05-10 14:13:12 +0300
commit47a1a408ef865d9fa115bcbb3dcb8b652edaffe5 (patch)
tree714caebaaa4f8b5b64b9677d1df5d491ac01d638
parent28ac0a9ef149c86090ba1a27def459365eabb6a9 (diff)
downloadydb-47a1a408ef865d9fa115bcbb3dcb8b652edaffe5.tar.gz
Grace (thread safe) Resource Manager Shutdown
Использование IKqpResourceManager как указателя с имплементацией в виде актора не позволяет управлять его временем жизни. Особенно это проявляется при шатдуне AS. Выношу логику в отдельный (не актор) объект, сам актор держит его через shared_ptr и таким образом гарантирует существование IKqpResourceManager на время жизни актора, но при этом позволяет корректно отработать всем вызовам когда/если актор уже разрушен.
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp4
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp431
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h8
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_ut.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp2
5 files changed, 234 insertions, 225 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 82e143790c3..74864f8bc67 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -590,7 +590,7 @@ private:
Send(executer, ev.Release());
}
- NRm::IKqpResourceManager* ResourceManager() {
+ std::shared_ptr<NRm::IKqpResourceManager> ResourceManager() {
if (Y_LIKELY(ResourceManager_)) {
return ResourceManager_;
}
@@ -606,7 +606,7 @@ private:
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
TIntrusivePtr<TKqpCounters> Counters;
IKqpNodeComputeActorFactory* CaFactory;
- NRm::IKqpResourceManager* ResourceManager_ = nullptr;
+ std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
//state sharded by TxId
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index c68af8de8e0..846d5458ce1 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -105,118 +105,47 @@ struct TTxStatesBucket {
constexpr ui64 BucketsCount = 64;
-struct TKqpNodeResourceManager {
- ui32 NodeId;
- IKqpResourceManager* Instance;
-
- explicit TKqpNodeResourceManager(ui32 nodeId, IKqpResourceManager* instance)
- : NodeId(nodeId)
- , Instance(instance) {}
-};
+struct TEvPrivate {
+ enum EEv {
+ EvPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvSchedulePublishResources,
+ EvTakeResourcesSnapshot,
+ };
-struct TResourceManagers {
- std::atomic<TKqpNodeResourceManager*> Default = nullptr;
+ struct TEvPublishResources : public TEventLocal<TEvPublishResources, EEv::EvPublishResources> {
+ };
- TMutex Lock;
- std::unordered_map<ui32, TKqpNodeResourceManager*> ByNodeId;
+ struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> {
+ };
- ~TResourceManagers() {
- with_lock(Lock) {
- for (auto [nodeId, rm] : ByNodeId) {
- delete rm;
- }
- }
- }
+ struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> {
+ std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback;
+ };
};
-TResourceManagers ResourceManagers;
-
-} // namespace
-
-
-class TKqpResourceManagerActor : public IKqpResourceManager, public TActorBootstrapped<TKqpResourceManagerActor> {
- using TBase = TActorBootstrapped<TKqpResourceManagerActor>;
-
- struct TEvPrivate {
- enum EEv {
- EvPublishResources = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvSchedulePublishResources,
- EvTakeResourcesSnapshot,
- };
-
- struct TEvPublishResources : public TEventLocal<TEvPublishResources, EEv::EvPublishResources> {
- };
-
- struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> {
- };
-
- struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> {
- std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback;
- };
- };
+class TKqpResourceManager : public IKqpResourceManager {
public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_RESOURCE_MANAGER;
- }
- TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
- TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
- std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
+ TKqpResourceManager(const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TIntrusivePtr<TKqpCounters> counters)
: Config(config)
, Counters(counters)
- , ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
, ExecutionUnitsResource(Config.GetComputeActorsCount())
- , ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
- , KqpProxySharedResources(std::move(kqpProxySharedResources))
- {}
+ , ScanQueryMemoryResource(Config.GetQueryMemoryLimit()) {
- void Bootstrap() {
- ActorSystem = TlsActivationContext->ActorSystem();
+ }
+
+ void Bootstrap(TActorSystem* actorSystem, TActorId selfId) {
if (!Counters) {
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters);
}
- UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes(), PatternCache, Counters->GetKqpCounters());
-
- LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId);
-
- // Subscribe for tenant changes
- Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe);
-
- // Subscribe for TableService config changes
- ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem;
-
- Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()),
- new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}),
- IEventHandle::FlagTrackDelivery);
-
- ToBroker(new TEvResourceBroker::TEvResourceBrokerRequest);
- ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue));
-
- if (auto* mon = AppData()->Mon) {
- NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
- mon->RegisterActorPage(actorsMonPage, "kqp_resource_manager", "KQP Resource Manager", false,
- ActorSystem, SelfId());
- }
-
- WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
-
- Become(&TKqpResourceManagerActor::WorkState);
-
- AskSelfNodeInfo();
- SendWhiteboardRequest();
-
- auto rm = new TKqpNodeResourceManager(SelfId().NodeId(), this);
- with_lock (ResourceManagers.Lock) {
- ResourceManagers.ByNodeId[SelfId().NodeId()] = rm;
- }
- ResourceManagers.Default.store(rm, std::memory_order_release);
+ ActorSystem = actorSystem;
+ SelfId = selfId;
+ UpdatePatternCache(Config.GetKqpPatternCacheCapacityBytes());
}
-public:
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
- TKqpNotEnoughResources* details) override
- {
+ TKqpNotEnoughResources* details = nullptr) override {
if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
NotifyExternalResourcesAllocated(txId, taskId, resources);
return true;
@@ -295,7 +224,7 @@ public:
bool allocated = ResourceBroker->SubmitTaskInstant(
TEvResourceBroker::TEvSubmitTask(rbTaskId, rbTaskName, {0, resources.Memory}, "kqp_query", 0, {}),
- SelfId());
+ SelfId);
if (!allocated) {
auto unguard = ::Unguard(txBucket.Lock);
@@ -333,7 +262,7 @@ public:
taskState.ResourceBrokerTaskId = rbTaskId;
} else {
extraAlloc = true;
- bool merged = ResourceBroker->MergeTasksInstant(taskState.ResourceBrokerTaskId, rbTaskId, SelfId());
+ bool merged = ResourceBroker->MergeTasksInstant(taskState.ResourceBrokerTaskId, rbTaskId, SelfId);
Y_VERIFY(merged);
}
} // with_lock (txBucket.Lock)
@@ -350,36 +279,8 @@ public:
return true;
}
- void SendWhiteboardRequest() {
- auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>();
- Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId());
- }
-
- void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) {
- const auto& record = ev->Get()->Record;
- if (record.SystemStateInfoSize() != 1) {
- LOG_C("Unexpected whiteboard info");
- return;
- }
-
- const auto& info = record.GetSystemStateInfo(0);
- if (AppData()->UserPoolId >= info.PoolStatsSize()) {
- LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id"
- << ", pool size: " << info.PoolStatsSize()
- << ", user pool id: " << AppData()->UserPoolId);
- return;
- }
-
- const auto& pool = info.GetPoolStats(AppData()->UserPoolId);
-
- LOG_C("Received node white board pool stats: " << pool.usage());
- ProxyNodeResources.SetCpuUsage(pool.usage());
- ProxyNodeResources.SetThreads(pool.threads());
- }
-
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
- TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout) override
- {
+ IKqpResourceManager::TResourcesAllocatedCallback&& onSuccess, IKqpResourceManager::TNotEnoughtResourcesCallback&& onFail, TDuration timeout = {}) override {
Y_UNUSED(txId, taskId, resources, onSuccess, onFail, timeout);
// TODO: for DataQuery resources only
@@ -416,7 +317,7 @@ public:
releaseExecutionUnits = taskIt->second.ExecutionUnits;
bool finished = ResourceBroker->FinishTaskInstant(
- TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId());
+ TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId);
Y_VERIFY_DEBUG(finished);
remainsTasks = txIt->second.Tasks.size() - 1;
@@ -469,7 +370,7 @@ public:
for (auto& [taskId, taskState] : txIt->second.Tasks) {
bool finished = ResourceBroker->FinishTaskInstant(
- TEvResourceBroker::TEvFinishTask(taskState.ResourceBrokerTaskId), SelfId());
+ TEvResourceBroker::TEvFinishTask(taskState.ResourceBrokerTaskId), SelfId);
Y_VERIFY_DEBUG(finished);
}
@@ -594,7 +495,7 @@ public:
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request");
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();
ev->Callback = std::move(callback);
- TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId(), SelfId(), ev.Release());
+ TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release());
ActorSystem->Send(handle);
}
@@ -622,6 +523,154 @@ public:
}
}
+ ui32 GetNodeId() override {
+ return SelfId.NodeId();
+ }
+
+ TTxStatesBucket& TxBucket(ui64 txId) {
+ return Buckets[txId % Buckets.size()];
+ }
+
+ void FireResourcesPublishing() {
+ with_lock (Lock) {
+ if (PublishScheduledAt) {
+ return;
+ }
+ }
+
+ ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
+ }
+
+ void UpdatePatternCache(ui64 size) {
+ if (size) {
+ if (!PatternCache || PatternCache->GetMaxSize() != size) {
+ PatternCache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, Counters->GetKqpCounters());
+ }
+ } else {
+ PatternCache.reset();
+ }
+ }
+
+ TActorId SelfId;
+
+ NKikimrConfig::TTableServiceConfig::TResourceManager Config; // guarded by Lock
+ TIntrusivePtr<TKqpCounters> Counters;
+ TIntrusivePtr<NResourceBroker::IResourceBroker> ResourceBroker;
+ TActorSystem* ActorSystem = nullptr;
+
+ // common guard
+ TAdaptiveLock Lock;
+
+ // limits (guarded by Lock)
+ TLimitedResource<ui32> ExecutionUnitsResource;
+ TLimitedResource<ui64> ScanQueryMemoryResource;
+ ui64 ExternalDataQueryMemory = 0;
+
+ // current state
+ std::array<TTxStatesBucket, BucketsCount> Buckets;
+ std::atomic<ui64> LastResourceBrokerTaskId = 0;
+
+ // schedule info (guarded by Lock)
+ std::optional<TInstant> PublishScheduledAt;
+
+ // pattern cache for different actors
+ std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;
+};
+
+struct TResourceManagers {
+ std::weak_ptr<TKqpResourceManager> Default;
+
+ TMutex Lock;
+ std::unordered_map<ui32, std::weak_ptr<TKqpResourceManager>> ByNodeId;
+};
+
+TResourceManagers ResourceManagers;
+
+} // namespace
+
+
+class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerActor> {
+ using TBase = TActorBootstrapped<TKqpResourceManagerActor>;
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_RESOURCE_MANAGER;
+ }
+
+ TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
+ TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
+ std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
+ : ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
+ , KqpProxySharedResources(std::move(kqpProxySharedResources))
+ {
+ ResourceManager = std::make_shared<TKqpResourceManager>(config, counters);
+ }
+
+ void Bootstrap() {
+ ResourceManager->Bootstrap(TlsActivationContext->ActorSystem(), SelfId());
+
+ LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId);
+
+ // Subscribe for tenant changes
+ Send(MakeTenantPoolRootID(), new TEvents::TEvSubscribe);
+
+ // Subscribe for TableService config changes
+ ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem;
+
+ Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()),
+ new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}),
+ IEventHandle::FlagTrackDelivery);
+
+ ToBroker(new TEvResourceBroker::TEvResourceBrokerRequest);
+ ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue));
+
+ if (auto* mon = AppData()->Mon) {
+ NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
+ mon->RegisterActorPage(actorsMonPage, "kqp_resource_manager", "KQP Resource Manager", false,
+ ResourceManager->ActorSystem, SelfId());
+ }
+
+ WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
+
+ Become(&TKqpResourceManagerActor::WorkState);
+
+ AskSelfNodeInfo();
+ SendWhiteboardRequest();
+
+ with_lock (ResourceManagers.Lock) {
+ ResourceManagers.ByNodeId[SelfId().NodeId()] = ResourceManager;
+ ResourceManagers.Default = ResourceManager;
+ }
+ }
+
+public:
+ void SendWhiteboardRequest() {
+ auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>();
+ Send(WhiteBoardService, ev.release(), IEventHandle::FlagTrackDelivery, SelfId().NodeId());
+ }
+
+ void Handle(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+ if (record.SystemStateInfoSize() != 1) {
+ LOG_C("Unexpected whiteboard info");
+ return;
+ }
+
+ const auto& info = record.GetSystemStateInfo(0);
+ if (AppData()->UserPoolId >= info.PoolStatsSize()) {
+ LOG_C("Unexpected whiteboard info: pool size is smaller than user pool id"
+ << ", pool size: " << info.PoolStatsSize()
+ << ", user pool id: " << AppData()->UserPoolId);
+ return;
+ }
+
+ const auto& pool = info.GetPoolStats(AppData()->UserPoolId);
+
+ LOG_C("Received node white board pool stats: " << pool.usage());
+ ProxyNodeResources.SetCpuUsage(pool.usage());
+ ProxyNodeResources.SetThreads(pool.threads());
+ }
+
private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
@@ -646,8 +695,8 @@ private:
}
void HandleWork(TEvPrivate::TEvPublishResources::TPtr&) {
- with_lock (Lock) {
- PublishScheduledAt.reset();
+ with_lock (ResourceManager->Lock) {
+ ResourceManager->PublishScheduledAt.reset();
}
PublishResourceUsage("batching");
@@ -683,16 +732,16 @@ private:
auto& queueConfig = *ev->Get()->QueueConfig;
if (queueConfig.GetLimit().GetMemory() > 0) {
- with_lock (Lock) {
- ScanQueryMemoryResource.SetNewLimit(queueConfig.GetLimit().GetMemory());
+ with_lock (ResourceManager->Lock) {
+ ResourceManager->ScanQueryMemoryResource.SetNewLimit(queueConfig.GetLimit().GetMemory());
}
LOG_I("Total node memory for scan queries: " << queueConfig.GetLimit().GetMemory() << " bytes");
}
}
void HandleWork(TEvResourceBroker::TEvResourceBrokerResponse::TPtr& ev) {
- with_lock (Lock) {
- ResourceBroker = ev->Get()->ResourceBroker;
+ with_lock (ResourceManager->Lock) {
+ ResourceManager->ResourceBroker = ev->Get()->ResourceBroker;
}
}
@@ -742,22 +791,12 @@ private:
LOG_D("Subscribed for config changes");
}
- static void UpdatePatternCache(ui64 size, std::shared_ptr<NMiniKQL::TComputationPatternLRUCache>& cache, NMonitoring::TDynamicCounterPtr counters) {
- if (size) {
- if (!cache || cache->GetMaxSize() != size) {
- cache = std::make_shared<NMiniKQL::TComputationPatternLRUCache>(size, counters);
- }
- } else {
- cache.reset();
- }
- }
-
void HandleWork(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
auto& event = ev->Get()->Record;
Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(event), IEventHandle::FlagTrackDelivery, ev->Cookie);
auto& config = *event.MutableConfig()->MutableTableServiceConfig()->MutableResourceManager();
- UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(), PatternCache, Counters->GetKqpCounters());
+ ResourceManager->UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes());
#define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name());
FORCE_VALUE(ComputeActorsCount)
@@ -773,9 +812,9 @@ private:
LOG_I("Updated table service config: " << config.DebugString());
- with_lock (Lock) {
- ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount());
- Config.Swap(&config);
+ with_lock (ResourceManager->Lock) {
+ ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount());
+ ResourceManager->Config.Swap(&config);
}
}
@@ -805,24 +844,24 @@ private:
HTML(str) {
PRE() {
str << "Current config:" << Endl;
- with_lock (Lock) {
- str << Config.DebugString() << Endl;
+ with_lock (ResourceManager->Lock) {
+ str << ResourceManager->Config.DebugString() << Endl;
}
str << "State storage key: " << WbState.Tenant << Endl;
- with_lock (Lock) {
- str << "ScanQuery memory resource: " << ScanQueryMemoryResource.ToString() << Endl;
- str << "External DataQuery memory: " << ExternalDataQueryMemory << Endl;
- str << "ExecutionUnits resource: " << ExecutionUnitsResource.ToString() << Endl;
+ with_lock (ResourceManager->Lock) {
+ str << "ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource.ToString() << Endl;
+ str << "External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl;
+ str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.ToString() << Endl;
}
- str << "Last resource broker task id: " << LastResourceBrokerTaskId.load() << Endl;
+ str << "Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId.load() << Endl;
if (WbState.LastPublishTime) {
str << "Last publish time: " << *WbState.LastPublishTime << Endl;
}
std::optional<TInstant> publishScheduledAt;
- with_lock (Lock) {
- publishScheduledAt = PublishScheduledAt;
+ with_lock (ResourceManager->Lock) {
+ publishScheduledAt = ResourceManager->PublishScheduledAt;
}
if (publishScheduledAt) {
@@ -830,7 +869,7 @@ private:
}
str << Endl << "Transactions:" << Endl;
- for (auto& bucket : Buckets) {
+ for (auto& bucket : ResourceManager->Buckets) {
with_lock (bucket.Lock) {
for (auto& [txId, txState] : bucket.Txs) {
str << " TxId: " << txId << Endl;
@@ -857,10 +896,6 @@ private:
}
private:
- TTxStatesBucket& TxBucket(ui64 txId) {
- return Buckets[txId % Buckets.size()];
- }
-
void PassAway() override {
ToBroker(new TEvResourceBroker::TEvNotifyActorDied);
TActor::PassAway();
@@ -874,35 +909,25 @@ private:
return TStringBuilder() << "kqprm+" << database;
}
- void FireResourcesPublishing() {
- with_lock (Lock) {
- if (PublishScheduledAt) {
- return;
- }
- }
-
- ActorSystem->Send(SelfId(), new TEvPrivate::TEvSchedulePublishResources);
- }
-
void PublishResourceUsage(TStringBuf reason) {
TDuration publishInterval;
std::optional<TInstant> publishScheduledAt;
- with_lock (Lock) {
- publishInterval = TDuration::Seconds(Config.GetPublishStatisticsIntervalSec());
- publishScheduledAt = PublishScheduledAt;
+ with_lock (ResourceManager->Lock) {
+ publishInterval = TDuration::Seconds(ResourceManager->Config.GetPublishStatisticsIntervalSec());
+ publishScheduledAt = ResourceManager->PublishScheduledAt;
}
if (publishScheduledAt) {
return;
}
- auto now = ActorSystem->Timestamp();
+ auto now = ResourceManager->ActorSystem->Timestamp();
if (publishInterval && WbState.LastPublishTime && now - *WbState.LastPublishTime < publishInterval) {
publishScheduledAt = *WbState.LastPublishTime + publishInterval;
- with_lock (Lock) {
- PublishScheduledAt = publishScheduledAt;
+ with_lock (ResourceManager->Lock) {
+ ResourceManager->PublishScheduledAt = publishScheduledAt;
}
Schedule(*publishScheduledAt - now, new TEvPrivate::TEvPublishResources);
@@ -934,15 +959,15 @@ private:
}
*proxyNodeResources = ProxyNodeResources;
ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy
- with_lock (Lock) {
- payload.SetAvailableComputeActors(ExecutionUnitsResource.Available()); // legacy
- payload.SetTotalMemory(ScanQueryMemoryResource.GetLimit()); // legacy
- payload.SetUsedMemory(ScanQueryMemoryResource.GetLimit() - ScanQueryMemoryResource.Available()); // legacy
+ with_lock (ResourceManager->Lock) {
+ payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy
+ payload.SetTotalMemory(ResourceManager->ScanQueryMemoryResource.GetLimit()); // legacy
+ payload.SetUsedMemory(ResourceManager->ScanQueryMemoryResource.GetLimit() - ResourceManager->ScanQueryMemoryResource.Available()); // legacy
- payload.SetExecutionUnits(ExecutionUnitsResource.Available());
+ payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.Available());
auto* pool = payload.MutableMemory()->Add();
pool->SetPool(EKqpMemoryPool::ScanQuery);
- pool->SetAvailable(ScanQueryMemoryResource.Available());
+ pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());
}
auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(),
@@ -957,26 +982,7 @@ private:
}
private:
- NKikimrConfig::TTableServiceConfig::TResourceManager Config; // guarded by Lock
- TIntrusivePtr<TKqpCounters> Counters;
const TActorId ResourceBrokerId;
- TIntrusivePtr<NResourceBroker::IResourceBroker> ResourceBroker;
- TActorSystem* ActorSystem = nullptr;
-
- // common guard
- TAdaptiveLock Lock;
-
- // limits (guarded by Lock)
- TLimitedResource<ui32> ExecutionUnitsResource;
- TLimitedResource<ui64> ScanQueryMemoryResource;
- ui64 ExternalDataQueryMemory = 0;
-
- // current state
- std::array<TTxStatesBucket, BucketsCount> Buckets;
- std::atomic<ui64> LastResourceBrokerTaskId = 0;
-
- // schedule info (guarded by Lock)
- std::optional<TInstant> PublishScheduledAt;
// Whiteboard specific fields
struct TWhiteBoardState {
@@ -988,13 +994,12 @@ private:
};
TWhiteBoardState WbState;
- // pattern cache for different actors
- std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;
-
std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources;
NKikimrKqp::TKqpProxyNodeResources ProxyNodeResources;
TActorId WhiteBoardService;
+
+ std::shared_ptr<TKqpResourceManager> ResourceManager;
};
} // namespace NRm
@@ -1007,8 +1012,8 @@ NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServic
return new NRm::TKqpResourceManagerActor(config, counters, resourceBroker, std::move(kqpProxySharedResources));
}
-NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) {
- if (auto* rm = TryGetKqpResourceManager(_nodeId)) {
+std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> _nodeId) {
+ if (auto rm = TryGetKqpResourceManager(_nodeId)) {
return rm;
}
@@ -1016,18 +1021,18 @@ NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> _nodeId) {
Y_FAIL("KqpResourceManager not ready yet, node #%" PRIu32, nodeId);
}
-NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> _nodeId) {
+std::shared_ptr<NRm::IKqpResourceManager> TryGetKqpResourceManager(TMaybe<ui32> _nodeId) {
ui32 nodeId = _nodeId ? *_nodeId : TActivationContext::ActorSystem()->NodeId;
- auto rm = NRm::ResourceManagers.Default.load(std::memory_order_acquire);
- if (Y_LIKELY(rm && rm->NodeId == nodeId)) {
- return rm->Instance;
+ auto rm = NRm::ResourceManagers.Default.lock();
+ if (Y_LIKELY(rm && rm->GetNodeId() == nodeId)) {
+ return rm;
}
// for tests only
with_lock (NRm::ResourceManagers.Lock) {
auto it = NRm::ResourceManagers.ByNodeId.find(nodeId);
if (it != NRm::ResourceManagers.ByNodeId.end()) {
- return it->second->Instance;
+ return it->second.lock();
}
}
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index 49f0d0db904..d77e9072526 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -91,6 +91,10 @@ public:
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
virtual std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> GetPatternCache() = 0;
+
+ virtual ui32 GetNodeId() {
+ return 0;
+ }
};
@@ -108,8 +112,8 @@ NActors::IActor* CreateKqpResourceManagerActor(const NKikimrConfig::TTableServic
TIntrusivePtr<TKqpCounters> counters, NActors::TActorId resourceBroker = {},
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources = nullptr);
-NRm::IKqpResourceManager* GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
-NRm::IKqpResourceManager* TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
+std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
+std::shared_ptr<NRm::IKqpResourceManager> TryGetKqpResourceManager(TMaybe<ui32> nodeId = Nothing());
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
index 9e8b1f3926f..7f77cc9d67a 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
@@ -148,7 +148,7 @@ public:
UNIT_ASSERT_VALUES_EQUAL(t->GetCounter("InFlyTasks")->Val(), infly);
}
- void AssertResourceManagerStats(const NRm::IKqpResourceManager* rm, ui64 scanQueryMemory, ui32 executionUnits) {
+ void AssertResourceManagerStats(std::shared_ptr<NRm::IKqpResourceManager> rm, ui64 scanQueryMemory, ui32 executionUnits) {
auto stats = rm->GetLocalResources();
UNIT_ASSERT_VALUES_EQUAL(scanQueryMemory, stats.Memory[NRm::EKqpMemoryPool::ScanQuery]);
UNIT_ASSERT_VALUES_EQUAL(executionUnits, stats.ExecutionUnits);
@@ -184,7 +184,7 @@ void KqpRm::SingleTask() {
CreateKqpResourceManager(MakeKqpResourceManagerConfig());
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -206,7 +206,7 @@ void KqpRm::ManyTasks() {
CreateKqpResourceManager(MakeKqpResourceManagerConfig());
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -244,7 +244,7 @@ void KqpRm::NotEnoughMemory() {
CreateKqpResourceManager(MakeKqpResourceManagerConfig());
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -262,7 +262,7 @@ void KqpRm::NotEnoughExecutionUnits() {
CreateKqpResourceManager(MakeKqpResourceManagerConfig());
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 1000;
@@ -282,7 +282,7 @@ void KqpRm::ResourceBrokerNotEnoughResources() {
CreateKqpResourceManager(config);
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
@@ -305,7 +305,7 @@ void KqpRm::Snapshot() {
CreateKqpResourceManager(MakeKqpResourceManagerConfig());
NKikimr::TActorSystemStub stub;
- auto* rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
NRm::TKqpResourcesRequest request;
request.ExecutionUnits = 10;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 24111f604af..164351e7799 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -1055,7 +1055,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
KqpExecCtx.ApplyCtx = KqpApplyCtx.Get();
KqpExecCtx.Alloc = KqpAlloc.Get();
KqpExecCtx.TypeEnv = KqpTypeEnv.Get();
- if (auto* rm = NKqp::TryGetKqpResourceManager()) {
+ if (auto rm = NKqp::TryGetKqpResourceManager()) {
KqpExecCtx.PatternCache = rm->GetPatternCache();
}
}