diff options
author | hor911 <hor911@ydb.tech> | 2023-05-22 20:06:49 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-05-22 20:06:49 +0300 |
commit | 291d21bd2b1341583b714cd77e77a45821eb2e17 (patch) | |
tree | 5580f7dd8aa04f3d78c51a31fe6182d6290bc5a6 | |
parent | 7a2f5cd02629fd9ba4332c870229c0760b8499d1 (diff) | |
download | ydb-291d21bd2b1341583b714cd77e77a45821eb2e17.tar.gz |
Accurate memory management
Сейчас в транке (и в YQ/YQL, и в KQP) при завершении работы CA уменьшение квоты выделенной памяти (в RM или LWM) происходит до момента разрушения TaskRunner и реального возврата памяти (MKQL) в систему. При высокой активности кластере (и особенно автоматическом перезапуске запросов) возможно выделение этой же квоты другому CA и ООМ как в результате.
Меняю интерфейс квотировщика и переношу момент возврата после того, как уничтожен TaskRunner и все MKQL-аллокаторы. Понадобилось часть данных переложить в shared_ptr иначе на shutdown-е (особенно в тестах) получались разные неприятные эффекты.
10 files changed, 314 insertions, 139 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 444536dcd35..2adf967d565 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1601,15 +1601,12 @@ private: limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB; limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB; - auto id = SelfId(); - limits.AllocateMemoryFn = [TxId = TxId, actorId = id](auto /* txId */, ui64 taskId, ui64 memory) { - auto SelfId = [actorId] () { - return actorId; - }; - LOG_E("Data query task cannot allocate additional memory during executing." - << " Task: " << taskId << ", memory: " << memory); - return false; - }; + auto& taskOpts = taskDesc.GetProgram().GetSettings(); + auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/ + ? limits.MkqlHeavyProgramMemoryLimit + : limits.MkqlLightProgramMemoryLimit; + + limits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(limit, limit); auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), AsyncIoFactory, AppData()->FunctionRegistry, settings, limits); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 74864f8bc67..f45283e74b5 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -50,41 +50,88 @@ TString TasksIdsStr(const TTasksCollection& tasks) { } constexpr ui64 BucketsCount = 64; +using TBucketArray = std::array<NKqpNode::TState, BucketsCount>; -class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> { - using TBase = TActorBootstrapped<TKqpNodeService>; +NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64 txId) { + return (*buckets)[txId % buckets->size()]; +} -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_NODE_SERVICE; +void FinishKqpTask(ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) { + auto ctx = bucket.RemoveTask(txId, taskId, success); + if (ctx) { + if (ctx->ComputeActorsNumber == 0) { + ResourceManager->FreeResources(txId); + } else { + ResourceManager->FreeResources(txId, taskId); + } } +} - static void FinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues, - NKqpNode::TState& bucket) { - LOG_D("TxId: " << txId << ", finish compute task: " << taskId << ", success: " << success - << ", message: " << issues.ToOneLineString()); +struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { + + TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager + , NRm::EKqpMemoryPool memoryPool + , std::shared_ptr<TBucketArray> buckets + , ui64 txId + , ui64 taskId + , ui64 limit + , bool instantAlloc) + : NYql::NDq::TGuaranteeQuotaManager(limit, limit) + , ResourceManager(std::move(resourceManager)) + , MemoryPool(memoryPool) + , Buckets(std::move(buckets)) + , TxId(txId) + , TaskId(taskId) + , InstantAlloc(instantAlloc) { + } - auto ctx = bucket.RemoveTask(txId, taskId, success); + ~TMemoryQuotaManager() override { + FinishKqpTask(TxId, TaskId, Success, GetStateBucketByTx(Buckets, TxId), ResourceManager); + } - if (!ctx) { - LOG_E("TxId: " << txId << ", task: " << taskId << " unknown task"); - return; - } + bool AllocateExtraQuota(ui64 extraSize) override { - if (ctx->ComputeActorsNumber == 0) { - LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed"); - GetKqpResourceManager()->FreeResources(txId); - } else { - LOG_D("TxId: " << txId << ", finish compute task: " << taskId - << (success ? "" : " (cancelled)") - << ", remains " << ctx->ComputeActorsNumber << " compute actors and " - << ctx->TotalMemory << " bytes in the current request"); - GetKqpResourceManager()->FreeResources(txId, taskId); + if (!InstantAlloc) { + LOG_W("Memory allocation prohibited. TxId: " << TxId << ", taskId: " << TaskId << ", memory: +" << extraSize); + return false; } - if (ctx->FinixTx) { - LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed"); + if (!ResourceManager->AllocateResources(TxId, TaskId, + NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize})) { + LOG_W("Can not allocate memory. TxId: " << TxId << ", taskId: " << TaskId << ", memory: +" << extraSize); + return false; } + + return true; + } + + void FreeExtraQuota(ui64 extraSize) override { + ResourceManager->FreeResources(TxId, TaskId, + NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize} + ); + } + + void TerminateHandler(bool success, const NYql::TIssues& issues) { + LOG_D("TxId: " << TxId << ", finish compute task: " << TaskId << ", success: " << success + << ", message: " << issues.ToOneLineString()); + Success = success; + } + + std::shared_ptr<NRm::IKqpResourceManager> ResourceManager; + NRm::EKqpMemoryPool MemoryPool; + std::shared_ptr<TBucketArray> Buckets; + ui64 TxId; + ui64 TaskId; + bool InstantAlloc; + bool Success = true; +}; + +class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> { + using TBase = TActorBootstrapped<TKqpNodeService>; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::KQP_NODE_SERVICE; } TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters, @@ -93,7 +140,9 @@ public: , Counters(counters) , CaFactory(caFactory) , AsyncIoFactory(std::move(asyncIoFactory)) - {} + { + Buckets = std::make_shared<TBucketArray>(); + } void Bootstrap() { LOG_I("Starting KQP Node service"); @@ -229,7 +278,7 @@ private: NKqpNode::TTasksRequest request; request.Executer = ActorIdFromProto(msg.GetExecuterActorId()); - auto& bucket = GetStateBucketByTx(txId); + auto& bucket = GetStateBucketByTx(Buckets, txId); if (bucket.Exists(txId, requester)) { LOG_E("TxId: " << txId << ", requester: " << requester << ", request already exists"); @@ -334,20 +383,6 @@ private: memoryLimits.ChannelBufferSize = 0; memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit(); memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit(); - if (Config.GetEnableInstantMkqlMemoryAlloc()) { - memoryLimits.AllocateMemoryFn = [rm = ResourceManager(), memoryPool](const auto& txId, ui64 taskId, ui64 memory) { - NRm::TKqpResourcesRequest resources; - resources.MemoryPool = memoryPool; - resources.Memory = memory; - - if (rm->AllocateResources(std::get<ui64>(txId), taskId, resources)) { - return true; - } - - LOG_W("Can not allocate memory. TxId: " << txId << ", taskId: " << taskId << ", memory: +" << memory); - return false; - }; - } NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase; auto& msgRtSettings = msg.GetRuntimeSettings(); @@ -384,10 +419,28 @@ private: Y_VERIFY_DEBUG(memoryLimits.ChannelBufferSize >= Config.GetMinChannelBufferSize(), "actual size: %ld, min: %ld", memoryLimits.ChannelBufferSize, Config.GetMinChannelBufferSize()); + auto& taskOpts = dqTask.GetProgram().GetSettings(); + auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/ + ? memoryLimits.MkqlHeavyProgramMemoryLimit + : memoryLimits.MkqlLightProgramMemoryLimit; + + memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>( + ResourceManager(), + memoryPool, + Buckets, + txId, + dqTask.GetId(), + limit, + Config.GetEnableInstantMkqlMemoryAlloc()); + auto runtimeSettings = runtimeSettingsBase; - runtimeSettings.TerminateHandler = [txId, taskId = dqTask.GetId(), &bucket] + NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager; + runtimeSettings.TerminateHandler = [memoryQuotaManager] (bool success, const NYql::TIssues& issues) { - FinishKqpTask(txId, taskId, success, issues, bucket); + auto manager = memoryQuotaManager.lock(); + if (manager) { + static_cast<TMemoryQuotaManager*>(manager.get())->TerminateHandler(success, issues); + } }; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta; @@ -449,7 +502,7 @@ private: // used only for unit tests void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) { auto& msg = *ev->Get(); - FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, msg.Issues, GetStateBucketByTx(msg.TxId)); + FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, GetStateBucketByTx(Buckets, msg.TxId), GetKqpResourceManager()); } void HandleWork(TEvKqpNode::TEvCancelKqpTasksRequest::TPtr& ev) { @@ -461,7 +514,7 @@ private: } void TerminateTx(ui64 txId, const TString& reason) { - auto& bucket = GetStateBucketByTx(txId); + auto& bucket = GetStateBucketByTx(Buckets, txId); auto tasksToAbort = bucket.RemoveTx(txId); if (!tasksToAbort.empty()) { @@ -481,7 +534,7 @@ private: void HandleWork(TEvents::TEvWakeup::TPtr& ev) { Schedule(TDuration::Seconds(1), ev->Release().Release()); std::vector<ui64> txIdsToFree; - for (auto& bucket : Buckets) { + for (auto& bucket : *Buckets) { auto expiredRequests = bucket.ClearExpiredRequests(); for (auto& cxt : expiredRequests) { LOG_D("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester @@ -566,7 +619,7 @@ private: str << Endl; str << Endl << "Transactions:" << Endl; - for (auto& bucket : Buckets) { + for (auto& bucket : *Buckets) { bucket.GetInfo(str); } } @@ -598,10 +651,6 @@ private: return ResourceManager_; } - NKqpNode::TState& GetStateBucketByTx(ui64 txId) { - return Buckets[txId % Buckets.size()]; - } - private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr<TKqpCounters> Counters; @@ -610,7 +659,7 @@ private: NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; //state sharded by TxId - std::array<NKqpNode::TState, BucketsCount> Buckets; + std::shared_ptr<TBucketArray> Buckets; }; diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h index 45a6c4e9d50..c3ba09c91ea 100644 --- a/ydb/core/kqp/node_service/kqp_node_state.h +++ b/ydb/core/kqp/node_service/kqp_node_state.h @@ -52,7 +52,7 @@ struct TRemoveTaskContext { class TState { public: struct TRequestId { - ui64 TxId; + ui64 TxId = 0; TActorId Requester; }; diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index 43f3f8388e4..489e42454d9 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -131,12 +131,19 @@ struct TMockKqpComputeActorFactory : public IKqpNodeComputeActorFactory { mock.Task.Swap(&task); mock.Settings = settings; mock.MemoryLimits = memoryLimits; + UNIT_ASSERT(mock.MemoryLimits.MemoryQuotaManager->AllocateQuota(mock.MemoryLimits.MkqlLightProgramMemoryLimit)); + static_cast<NYql::NDq::TGuaranteeQuotaManager*>(mock.MemoryLimits.MemoryQuotaManager.get())->Step = 1; return Runtime.FindActor(actorId); } }; class KqpNode : public TTestBase { public: + + ~KqpNode() override { + CompFactory.Reset(); + } + void SetUp() override { Runtime.Reset(new TTenantTestRuntime(MakeTenantTestConfig())); @@ -371,8 +378,7 @@ void KqpNode::CommonCase() { { NKikimr::TActorSystemStub stub; - auto& task4ExtraAlloc = CompFactory->Task2Actor[4].MemoryLimits.AllocateMemoryFn; - bool allocated = task4ExtraAlloc(/* txId */ (ui64)2, /* taskId */ 4, /* memory */ 100); + bool allocated = CompFactory->Task2Actor[4].MemoryLimits.MemoryQuotaManager->AllocateQuota(100); UNIT_ASSERT(allocated); DispatchKqpNodePostponedEvents(sender1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4); @@ -398,6 +404,12 @@ void KqpNode::CommonCase() { } AssertResourceBrokerSensors(0, 0, 0, 5, 0); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::ExtraAllocation() { @@ -414,13 +426,11 @@ void KqpNode::ExtraAllocation() { SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2}); Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1); - auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - // memory granted { NKikimr::TActorSystemStub stub; - bool allocated = task1ExtraAlloc(/* txId */ (ui64)1, /* taskId */ 1, /* memory */ 100); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100); UNIT_ASSERT(allocated); DispatchKqpNodePostponedEvents(sender1); @@ -433,7 +443,7 @@ void KqpNode::ExtraAllocation() { { NKikimr::TActorSystemStub stub; - bool allocated = task1ExtraAlloc(/* txId */ (ui64)1, /* taskId */ 1, /* memory */ 50'000); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(50'000); UNIT_ASSERT(!allocated); DispatchKqpNodePostponedEvents(sender1); @@ -443,6 +453,12 @@ void KqpNode::ExtraAllocation() { UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2); } + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::NotEnoughMemory() { @@ -504,8 +520,7 @@ void KqpNode::NotEnoughMemory_Extra() { { NKikimr::TActorSystemStub stub; - auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - bool allocated = task1ExtraAlloc((ui64)1, 1, 1'000'000); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(1'000'000); UNIT_ASSERT(!allocated); } @@ -517,6 +532,12 @@ void KqpNode::NotEnoughMemory_Extra() { UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::NotEnoughComputeActors() { @@ -539,6 +560,12 @@ void KqpNode::NotEnoughComputeActors() { } AssertResourceBrokerSensors(0, 0, 0, 4, 0); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::ResourceBrokerNotEnoughResources() { @@ -570,6 +597,12 @@ void KqpNode::ResourceBrokerNotEnoughResources() { } AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 1, 2); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::ResourceBrokerNotEnoughResources_Extra() { @@ -590,12 +623,17 @@ void KqpNode::ResourceBrokerNotEnoughResources_Extra() { { NKikimr::TActorSystemStub stub; - auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - bool allocated = task1ExtraAlloc((ui64)1, 1, 2 * (6000 * 2 + 1000 / 2)); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(2 * (6000 * 2 + 1000 / 2)); UNIT_ASSERT(!allocated); } AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::ExecuterLost() { @@ -609,8 +647,7 @@ void KqpNode::ExecuterLost() { { NKikimr::TActorSystemStub stub; - auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - bool allocated = task1ExtraAlloc((ui64)1, 1, 100); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100); UNIT_ASSERT(allocated); DispatchKqpNodePostponedEvents(sender1); } @@ -632,6 +669,12 @@ void KqpNode::ExecuterLost() { UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); AssertResourceBrokerSensors(0, 0, 0, 3, 0); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } void KqpNode::TerminateTx() { @@ -649,8 +692,7 @@ void KqpNode::TerminateTx() { { NKikimr::TActorSystemStub stub; - auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - bool allocated = task1ExtraAlloc((ui64)1, 1, 100); + bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100); UNIT_ASSERT(allocated); DispatchKqpNodePostponedEvents(sender1); } @@ -683,6 +725,12 @@ void KqpNode::TerminateTx() { UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); AssertResourceBrokerSensors(0, 0, 0, 4, 0); + + { + NKikimr::TActorSystemStub stub; + + CompFactory->Task2Actor.clear(); + } } } // namespace NKqp diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 40bce04634e..5da25141ad8 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -232,8 +232,69 @@ struct TComputeRuntimeSettings { TMaybe<NDqProto::TRlPath> RlPath; }; -using TAllocateMemoryCallback = std::function<bool(const TTxId& txId, ui64 taskId, ui64 memory)>; -using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, ui64 memory)>; +struct IMemoryQuotaManager { + using TPtr = std::shared_ptr<IMemoryQuotaManager>; + using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>; + virtual ~IMemoryQuotaManager() = default; + virtual bool AllocateQuota(ui64 memorySize) = 0; + virtual void FreeQuota(ui64 memorySize) = 0; + virtual ui64 GetCurrentQuota() const = 0; +}; + +struct TGuaranteeQuotaManager : public IMemoryQuotaManager { + + TGuaranteeQuotaManager(ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0) + : Limit(limit), Guarantee(guarantee), Step(step), Quota(quota) { + Y_VERIFY(Limit >= Guarantee); + Y_VERIFY(Limit >= Quota); + Y_VERIFY((Step ^ ~Step) + 1 == 0); + } + + bool AllocateQuota(ui64 memorySize) override { + if (Quota + memorySize > Limit) { + ui64 delta = Quota + memorySize - Limit; + ui64 alignMask = Step - 1; + delta = (delta + alignMask) & ~alignMask; + + if (!AllocateExtraQuota(delta)) { + return false; + } + + Limit += delta; + } + + Quota += memorySize; + return true; + } + + void FreeQuota(ui64 memorySize) override { + Y_VERIFY(Quota >= memorySize); + Quota -= memorySize; + ui64 delta = Limit - std::max(Quota, Guarantee); + if (delta >= Step) { + ui64 alignMask = Step - 1; + delta &= ~alignMask; + FreeExtraQuota(delta); + Limit -= delta; + } + } + + ui64 GetCurrentQuota() const override { + return Quota; + } + + virtual bool AllocateExtraQuota(ui64) { + return false; + } + + virtual void FreeExtraQuota(ui64) { + } + + ui64 Limit; + ui64 Guarantee; + ui64 Step; + ui64 Quota; +}; struct TComputeMemoryLimits { ui64 ChannelBufferSize = 0; @@ -241,10 +302,10 @@ struct TComputeMemoryLimits { ui64 MkqlHeavyProgramMemoryLimit = 0; // Limit for heavy program. ui64 MkqlProgramHardMemoryLimit = 0; // Limit that stops program execution if reached. - TAllocateMemoryCallback AllocateMemoryFn = nullptr; - TFreeMemoryCallback FreeMemoryFn = nullptr; ui64 MinMemAllocSize = 30_MB; ui64 MinMemFreeSize = 30_MB; + + IMemoryQuotaManager::TPtr MemoryQuotaManager; }; using TTaskRunnerFactory = std::function< diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 6fb88ba70d7..1fb29ed6abe 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -184,7 +184,7 @@ protected: , Task(std::move(task)) , RuntimeSettings(settings) , MemoryLimits(memoryLimits) - , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) + , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0) , AsyncIoFactory(std::move(asyncIoFactory)) , FunctionRegistry(functionRegistry) , CheckpointingMode(GetTaskCheckpointingMode(Task)) @@ -218,7 +218,7 @@ protected: , Task(task) , RuntimeSettings(settings) , MemoryLimits(memoryLimits) - , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) + , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0) , AsyncIoFactory(std::move(asyncIoFactory)) , FunctionRegistry(functionRegistry) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) @@ -323,22 +323,29 @@ protected: } void DoExecute() { - auto guard = BindAllocator(); - auto* alloc = guard.GetMutex(); + { + auto guard = BindAllocator(); + auto* alloc = guard.GetMutex(); - if (State == NDqProto::COMPUTE_STATE_FINISHED) { - if (!DoHandleChannelsAfterFinishImpl()) { - return; + if (State == NDqProto::COMPUTE_STATE_FINISHED) { + if (!DoHandleChannelsAfterFinishImpl()) { + return; + } + } else { + DoExecuteImpl(); } - } else { - DoExecuteImpl(); - } - if (MemoryQuota) { - MemoryQuota->TryShrinkMemory(alloc); - } + if (MemoryQuota) { + MemoryQuota->TryShrinkMemory(alloc); + } - ReportStats(TInstant::Now()); + ReportStats(TInstant::Now()); + } + if (Terminated) { + TaskRunner.Reset(); + MemoryQuota.Reset(); + MemoryLimits.MemoryQuotaManager.reset(); + } } virtual void DoExecuteImpl() { @@ -545,10 +552,6 @@ protected: } } - if (RuntimeSettings.TerminateHandler) { - RuntimeSettings.TerminateHandler(success, issues); - } - { if (guard) { // free MKQL memory then destroy TaskRunner and Allocator @@ -564,8 +567,12 @@ protected: } } + if (RuntimeSettings.TerminateHandler) { + RuntimeSettings.TerminateHandler(success, issues); + } + this->PassAway(); - MemoryQuota = nullptr; + Terminated = true; } void Terminate(bool success, const TString& message) { @@ -2094,7 +2101,7 @@ protected: const NDqProto::TDqTask Task; TString LogPrefix; const TComputeRuntimeSettings RuntimeSettings; - const TComputeMemoryLimits MemoryLimits; + TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; const IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; @@ -2137,6 +2144,7 @@ private: bool Running = true; TInstant LastSendStatsTime; bool PassExceptions = false; + bool Terminated = false; protected: ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h index 8bff90633b0..b7604b2b2ef 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h @@ -36,9 +36,11 @@ namespace NYql::NDq { , ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr) , CanAllocateExtraMemory(canAllocateExtraMemory) , ActorSystem(actorSystem) { - if (MkqlMemoryQuota) { - MkqlMemoryQuota->Add(MkqlMemoryLimit); - } + + Y_VERIFY(MemoryLimits.MemoryQuotaManager->AllocateQuota(MkqlMemoryLimit)); + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Add(MkqlMemoryLimit); + } } ui64 GetMkqlMemoryLimit() const { @@ -56,18 +58,16 @@ namespace NYql::NDq { void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) { if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) { alloc->ReleaseFreePages(); - if (MemoryLimits.FreeMemoryFn) { - auto newLimit = std::max(alloc->GetAllocated(), InitialMkqlMemoryLimit); - if (MkqlMemoryLimit > newLimit) { - auto freedSize = MkqlMemoryLimit - newLimit; - MkqlMemoryLimit = newLimit; - alloc->SetLimit(newLimit); - MemoryLimits.FreeMemoryFn(TxId, TaskId, freedSize); - if (MkqlMemoryQuota) { - MkqlMemoryQuota->Sub(freedSize); - } - CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit); + auto newLimit = std::max(alloc->GetAllocated(), InitialMkqlMemoryLimit); + if (MkqlMemoryLimit > newLimit) { + auto freedSize = MkqlMemoryLimit - newLimit; + MkqlMemoryLimit = newLimit; + alloc->SetLimit(newLimit); + MemoryLimits.MemoryQuotaManager->FreeQuota(freedSize); + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Sub(freedSize); } + CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit); } } @@ -94,8 +94,8 @@ namespace NYql::NDq { } void TryReleaseQuota() { - if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) { - MemoryLimits.FreeMemoryFn(TxId, TaskId, MkqlMemoryLimit); + if (MkqlMemoryLimit) { + MemoryLimits.MemoryQuotaManager->FreeQuota(MkqlMemoryLimit); if (MkqlMemoryQuota) { MkqlMemoryQuota->Sub(MkqlMemoryLimit); } @@ -119,7 +119,7 @@ namespace NYql::NDq { throw THardMemoryLimitException(); } - if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) { + if (MemoryLimits.MemoryQuotaManager->AllocateQuota(memory)) { MkqlMemoryLimit += memory; if (MkqlMemoryQuota) { MkqlMemoryQuota->Add(memory); diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 0508c9a964c..0c8e03e2854 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -16,8 +16,7 @@ using namespace NDqs; IActor* CreateComputeActor( const TLocalWorkerManagerOptions& options, - NDq::TAllocateMemoryCallback allocateMemoryFn, - NDq::TFreeMemoryCallback freeMemoryFn, + NDq::IMemoryQuotaManager::TPtr memoryQuotaManager, const TActorId& executerId, const TString& operationId, NYql::NDqProto::TDqTask&& task, @@ -31,8 +30,7 @@ IActor* CreateComputeActor( memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit; memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit; memoryLimits.MkqlProgramHardMemoryLimit = options.MkqlProgramHardMemoryLimit; - memoryLimits.AllocateMemoryFn = allocateMemoryFn; - memoryLimits.FreeMemoryFn = freeMemoryFn; + memoryLimits.MemoryQuotaManager = memoryQuotaManager; // min alloc size == min free size to simplify api memoryLimits.MinMemAllocSize = options.MkqlMinAllocSize; memoryLimits.MinMemFreeSize = options.MkqlMinAllocSize; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h index b0eaf4f6880..ea4c691bb8d 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.h +++ b/ydb/library/yql/providers/dq/actors/compute_actor.h @@ -7,8 +7,7 @@ namespace NYql { NActors::IActor* CreateComputeActor( const NYql::NDqs::TLocalWorkerManagerOptions& options, - NDq::TAllocateMemoryCallback allocateMemoryFn, - NDq::TFreeMemoryCallback freeMemoryFn, + NDq::IMemoryQuotaManager::TPtr memoryQuotaManager, const NActors::TActorId& executerId, const TString& operationId, NYql::NDqProto::TDqTask&& task, diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 7da4c641583..13ffc497ee4 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/providers/dq/actors/compute_actor.h> #include <ydb/library/yql/providers/dq/actors/worker_actor.h> #include <ydb/library/yql/providers/dq/runtime/runtime_data.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> #include <ydb/library/yql/dq/common/dq_resource_quoter.h> @@ -37,6 +38,32 @@ union TDqLocalResourceId { static_assert(sizeof(TDqLocalResourceId) == 8); +struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { + + TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit) + : NYql::NDq::TGuaranteeQuotaManager(limit, limit) + , NodeQuoter(nodeQuoter) + , TxId(txId) { + } + + ~TMemoryQuotaManager() override { + if (Limit) { + NodeQuoter->Free(TxId, 0, Limit); + } + } + + bool AllocateExtraQuota(ui64 extraSize) override { + return NodeQuoter->Allocate(TxId, 0, extraSize); + } + + void FreeExtraQuota(ui64 extraSize) override { + NodeQuoter->Free(TxId, 0, extraSize); + } + + std::shared_ptr<NDq::TResourceQuoter> NodeQuoter; + NDq::TTxId TxId; +}; + class TLocalWorkerManager: public TWorkerManagerCommon<TLocalWorkerManager> { public: @@ -54,16 +81,6 @@ public: limitCounter->Set(limit); allocatedCounter->Set(allocated); }); - - AllocateMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { - // mem per task is not tracked yet - return quoter->Allocate(txId, 0, size); - }; - - FreeMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { - // mem per task is not tracked yet - quoter->Free(txId, 0, size); - }; } private: @@ -218,13 +235,16 @@ private: auto& tasks = *ev->Get()->Record.MutableTask(); ui64 totalInitialTaskMemoryLimit = 0; + std::vector<ui64> quotas; if (createComputeActor) { Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count)); + quotas.reserve(count); for (auto& task : tasks) { auto taskLimit = task.GetInitialTaskMemoryLimit(); if (taskLimit == 0) { taskLimit = Options.MkqlInitialMemoryLimit; } + quotas.push_back(taskLimit); totalInitialTaskMemoryLimit += taskLimit; } } else { @@ -268,8 +288,7 @@ private: actor.Reset(NYql::CreateComputeActor( Options, - Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr, - Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr, + std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]), resultId, traceId, std::move(tasks[i]), @@ -322,8 +341,6 @@ private: YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId; } - MemoryQuoter->Free(it->second.TxId, 0); - auto traceId = std::get<TString>(it->second.TxId); auto itt = TaskCountersMap.find(traceId); if (itt != TaskCountersMap.end()) { @@ -369,8 +386,6 @@ private: TRusage Rusage; - NDq::TAllocateMemoryCallback AllocateMemoryFn; - NDq::TFreeMemoryCallback FreeMemoryFn; std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter; struct TCountersInfo { |