aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-05-22 20:06:49 +0300
committerhor911 <hor911@ydb.tech>2023-05-22 20:06:49 +0300
commit291d21bd2b1341583b714cd77e77a45821eb2e17 (patch)
tree5580f7dd8aa04f3d78c51a31fe6182d6290bc5a6
parent7a2f5cd02629fd9ba4332c870229c0760b8499d1 (diff)
downloadydb-291d21bd2b1341583b714cd77e77a45821eb2e17.tar.gz
Accurate memory management
Сейчас в транке (и в YQ/YQL, и в KQP) при завершении работы CA уменьшение квоты выделенной памяти (в RM или LWM) происходит до момента разрушения TaskRunner и реального возврата памяти (MKQL) в систему. При высокой активности кластере (и особенно автоматическом перезапуске запросов) возможно выделение этой же квоты другому CA и ООМ как в результате. Меняю интерфейс квотировщика и переношу момент возврата после того, как уничтожен TaskRunner и все MKQL-аллокаторы. Понадобилось часть данных переложить в shared_ptr иначе на shutdown-е (особенно в тестах) получались разные неприятные эффекты.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp15
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp153
-rw-r--r--ydb/core/kqp/node_service/kqp_node_state.h2
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp76
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h69
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h48
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h34
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp47
10 files changed, 314 insertions, 139 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 444536dcd35..2adf967d565 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1601,15 +1601,12 @@ private:
limits.MkqlLightProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(500_MB, Request.MkqlMemoryLimit) : 500_MB;
limits.MkqlHeavyProgramMemoryLimit = Request.MkqlMemoryLimit > 0 ? std::min(2_GB, Request.MkqlMemoryLimit) : 2_GB;
- auto id = SelfId();
- limits.AllocateMemoryFn = [TxId = TxId, actorId = id](auto /* txId */, ui64 taskId, ui64 memory) {
- auto SelfId = [actorId] () {
- return actorId;
- };
- LOG_E("Data query task cannot allocate additional memory during executing."
- << " Task: " << taskId << ", memory: " << memory);
- return false;
- };
+ auto& taskOpts = taskDesc.GetProgram().GetSettings();
+ auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
+ ? limits.MkqlHeavyProgramMemoryLimit
+ : limits.MkqlLightProgramMemoryLimit;
+
+ limits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(limit, limit);
auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), AsyncIoFactory,
AppData()->FunctionRegistry, settings, limits);
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 74864f8bc67..f45283e74b5 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -50,41 +50,88 @@ TString TasksIdsStr(const TTasksCollection& tasks) {
}
constexpr ui64 BucketsCount = 64;
+using TBucketArray = std::array<NKqpNode::TState, BucketsCount>;
-class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
- using TBase = TActorBootstrapped<TKqpNodeService>;
+NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64 txId) {
+ return (*buckets)[txId % buckets->size()];
+}
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_NODE_SERVICE;
+void FinishKqpTask(ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) {
+ auto ctx = bucket.RemoveTask(txId, taskId, success);
+ if (ctx) {
+ if (ctx->ComputeActorsNumber == 0) {
+ ResourceManager->FreeResources(txId);
+ } else {
+ ResourceManager->FreeResources(txId, taskId);
+ }
}
+}
- static void FinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues,
- NKqpNode::TState& bucket) {
- LOG_D("TxId: " << txId << ", finish compute task: " << taskId << ", success: " << success
- << ", message: " << issues.ToOneLineString());
+struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
+
+ TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
+ , NRm::EKqpMemoryPool memoryPool
+ , std::shared_ptr<TBucketArray> buckets
+ , ui64 txId
+ , ui64 taskId
+ , ui64 limit
+ , bool instantAlloc)
+ : NYql::NDq::TGuaranteeQuotaManager(limit, limit)
+ , ResourceManager(std::move(resourceManager))
+ , MemoryPool(memoryPool)
+ , Buckets(std::move(buckets))
+ , TxId(txId)
+ , TaskId(taskId)
+ , InstantAlloc(instantAlloc) {
+ }
- auto ctx = bucket.RemoveTask(txId, taskId, success);
+ ~TMemoryQuotaManager() override {
+ FinishKqpTask(TxId, TaskId, Success, GetStateBucketByTx(Buckets, TxId), ResourceManager);
+ }
- if (!ctx) {
- LOG_E("TxId: " << txId << ", task: " << taskId << " unknown task");
- return;
- }
+ bool AllocateExtraQuota(ui64 extraSize) override {
- if (ctx->ComputeActorsNumber == 0) {
- LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed");
- GetKqpResourceManager()->FreeResources(txId);
- } else {
- LOG_D("TxId: " << txId << ", finish compute task: " << taskId
- << (success ? "" : " (cancelled)")
- << ", remains " << ctx->ComputeActorsNumber << " compute actors and "
- << ctx->TotalMemory << " bytes in the current request");
- GetKqpResourceManager()->FreeResources(txId, taskId);
+ if (!InstantAlloc) {
+ LOG_W("Memory allocation prohibited. TxId: " << TxId << ", taskId: " << TaskId << ", memory: +" << extraSize);
+ return false;
}
- if (ctx->FinixTx) {
- LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed");
+ if (!ResourceManager->AllocateResources(TxId, TaskId,
+ NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize})) {
+ LOG_W("Can not allocate memory. TxId: " << TxId << ", taskId: " << TaskId << ", memory: +" << extraSize);
+ return false;
}
+
+ return true;
+ }
+
+ void FreeExtraQuota(ui64 extraSize) override {
+ ResourceManager->FreeResources(TxId, TaskId,
+ NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
+ );
+ }
+
+ void TerminateHandler(bool success, const NYql::TIssues& issues) {
+ LOG_D("TxId: " << TxId << ", finish compute task: " << TaskId << ", success: " << success
+ << ", message: " << issues.ToOneLineString());
+ Success = success;
+ }
+
+ std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
+ NRm::EKqpMemoryPool MemoryPool;
+ std::shared_ptr<TBucketArray> Buckets;
+ ui64 TxId;
+ ui64 TaskId;
+ bool InstantAlloc;
+ bool Success = true;
+};
+
+class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
+ using TBase = TActorBootstrapped<TKqpNodeService>;
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::KQP_NODE_SERVICE;
}
TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters,
@@ -93,7 +140,9 @@ public:
, Counters(counters)
, CaFactory(caFactory)
, AsyncIoFactory(std::move(asyncIoFactory))
- {}
+ {
+ Buckets = std::make_shared<TBucketArray>();
+ }
void Bootstrap() {
LOG_I("Starting KQP Node service");
@@ -229,7 +278,7 @@ private:
NKqpNode::TTasksRequest request;
request.Executer = ActorIdFromProto(msg.GetExecuterActorId());
- auto& bucket = GetStateBucketByTx(txId);
+ auto& bucket = GetStateBucketByTx(Buckets, txId);
if (bucket.Exists(txId, requester)) {
LOG_E("TxId: " << txId << ", requester: " << requester << ", request already exists");
@@ -334,20 +383,6 @@ private:
memoryLimits.ChannelBufferSize = 0;
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
- if (Config.GetEnableInstantMkqlMemoryAlloc()) {
- memoryLimits.AllocateMemoryFn = [rm = ResourceManager(), memoryPool](const auto& txId, ui64 taskId, ui64 memory) {
- NRm::TKqpResourcesRequest resources;
- resources.MemoryPool = memoryPool;
- resources.Memory = memory;
-
- if (rm->AllocateResources(std::get<ui64>(txId), taskId, resources)) {
- return true;
- }
-
- LOG_W("Can not allocate memory. TxId: " << txId << ", taskId: " << taskId << ", memory: +" << memory);
- return false;
- };
- }
NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase;
auto& msgRtSettings = msg.GetRuntimeSettings();
@@ -384,10 +419,28 @@ private:
Y_VERIFY_DEBUG(memoryLimits.ChannelBufferSize >= Config.GetMinChannelBufferSize(),
"actual size: %ld, min: %ld", memoryLimits.ChannelBufferSize, Config.GetMinChannelBufferSize());
+ auto& taskOpts = dqTask.GetProgram().GetSettings();
+ auto limit = taskOpts.GetHasMapJoin() /* || opts.GetHasSort()*/
+ ? memoryLimits.MkqlHeavyProgramMemoryLimit
+ : memoryLimits.MkqlLightProgramMemoryLimit;
+
+ memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
+ ResourceManager(),
+ memoryPool,
+ Buckets,
+ txId,
+ dqTask.GetId(),
+ limit,
+ Config.GetEnableInstantMkqlMemoryAlloc());
+
auto runtimeSettings = runtimeSettingsBase;
- runtimeSettings.TerminateHandler = [txId, taskId = dqTask.GetId(), &bucket]
+ NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
+ runtimeSettings.TerminateHandler = [memoryQuotaManager]
(bool success, const NYql::TIssues& issues) {
- FinishKqpTask(txId, taskId, success, issues, bucket);
+ auto manager = memoryQuotaManager.lock();
+ if (manager) {
+ static_cast<TMemoryQuotaManager*>(manager.get())->TerminateHandler(success, issues);
+ }
};
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta;
@@ -449,7 +502,7 @@ private:
// used only for unit tests
void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) {
auto& msg = *ev->Get();
- FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, msg.Issues, GetStateBucketByTx(msg.TxId));
+ FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, GetStateBucketByTx(Buckets, msg.TxId), GetKqpResourceManager());
}
void HandleWork(TEvKqpNode::TEvCancelKqpTasksRequest::TPtr& ev) {
@@ -461,7 +514,7 @@ private:
}
void TerminateTx(ui64 txId, const TString& reason) {
- auto& bucket = GetStateBucketByTx(txId);
+ auto& bucket = GetStateBucketByTx(Buckets, txId);
auto tasksToAbort = bucket.RemoveTx(txId);
if (!tasksToAbort.empty()) {
@@ -481,7 +534,7 @@ private:
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
Schedule(TDuration::Seconds(1), ev->Release().Release());
std::vector<ui64> txIdsToFree;
- for (auto& bucket : Buckets) {
+ for (auto& bucket : *Buckets) {
auto expiredRequests = bucket.ClearExpiredRequests();
for (auto& cxt : expiredRequests) {
LOG_D("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester
@@ -566,7 +619,7 @@ private:
str << Endl;
str << Endl << "Transactions:" << Endl;
- for (auto& bucket : Buckets) {
+ for (auto& bucket : *Buckets) {
bucket.GetInfo(str);
}
}
@@ -598,10 +651,6 @@ private:
return ResourceManager_;
}
- NKqpNode::TState& GetStateBucketByTx(ui64 txId) {
- return Buckets[txId % Buckets.size()];
- }
-
private:
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
TIntrusivePtr<TKqpCounters> Counters;
@@ -610,7 +659,7 @@ private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
//state sharded by TxId
- std::array<NKqpNode::TState, BucketsCount> Buckets;
+ std::shared_ptr<TBucketArray> Buckets;
};
diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h
index 45a6c4e9d50..c3ba09c91ea 100644
--- a/ydb/core/kqp/node_service/kqp_node_state.h
+++ b/ydb/core/kqp/node_service/kqp_node_state.h
@@ -52,7 +52,7 @@ struct TRemoveTaskContext {
class TState {
public:
struct TRequestId {
- ui64 TxId;
+ ui64 TxId = 0;
TActorId Requester;
};
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index 43f3f8388e4..489e42454d9 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -131,12 +131,19 @@ struct TMockKqpComputeActorFactory : public IKqpNodeComputeActorFactory {
mock.Task.Swap(&task);
mock.Settings = settings;
mock.MemoryLimits = memoryLimits;
+ UNIT_ASSERT(mock.MemoryLimits.MemoryQuotaManager->AllocateQuota(mock.MemoryLimits.MkqlLightProgramMemoryLimit));
+ static_cast<NYql::NDq::TGuaranteeQuotaManager*>(mock.MemoryLimits.MemoryQuotaManager.get())->Step = 1;
return Runtime.FindActor(actorId);
}
};
class KqpNode : public TTestBase {
public:
+
+ ~KqpNode() override {
+ CompFactory.Reset();
+ }
+
void SetUp() override {
Runtime.Reset(new TTenantTestRuntime(MakeTenantTestConfig()));
@@ -371,8 +378,7 @@ void KqpNode::CommonCase() {
{
NKikimr::TActorSystemStub stub;
- auto& task4ExtraAlloc = CompFactory->Task2Actor[4].MemoryLimits.AllocateMemoryFn;
- bool allocated = task4ExtraAlloc(/* txId */ (ui64)2, /* taskId */ 4, /* memory */ 100);
+ bool allocated = CompFactory->Task2Actor[4].MemoryLimits.MemoryQuotaManager->AllocateQuota(100);
UNIT_ASSERT(allocated);
DispatchKqpNodePostponedEvents(sender1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4);
@@ -398,6 +404,12 @@ void KqpNode::CommonCase() {
}
AssertResourceBrokerSensors(0, 0, 0, 5, 0);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::ExtraAllocation() {
@@ -414,13 +426,11 @@ void KqpNode::ExtraAllocation() {
SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2});
Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1);
- auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
-
// memory granted
{
NKikimr::TActorSystemStub stub;
- bool allocated = task1ExtraAlloc(/* txId */ (ui64)1, /* taskId */ 1, /* memory */ 100);
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100);
UNIT_ASSERT(allocated);
DispatchKqpNodePostponedEvents(sender1);
@@ -433,7 +443,7 @@ void KqpNode::ExtraAllocation() {
{
NKikimr::TActorSystemStub stub;
- bool allocated = task1ExtraAlloc(/* txId */ (ui64)1, /* taskId */ 1, /* memory */ 50'000);
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(50'000);
UNIT_ASSERT(!allocated);
DispatchKqpNodePostponedEvents(sender1);
@@ -443,6 +453,12 @@ void KqpNode::ExtraAllocation() {
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2);
}
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::NotEnoughMemory() {
@@ -504,8 +520,7 @@ void KqpNode::NotEnoughMemory_Extra() {
{
NKikimr::TActorSystemStub stub;
- auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
- bool allocated = task1ExtraAlloc((ui64)1, 1, 1'000'000);
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(1'000'000);
UNIT_ASSERT(!allocated);
}
@@ -517,6 +532,12 @@ void KqpNode::NotEnoughMemory_Extra() {
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::NotEnoughComputeActors() {
@@ -539,6 +560,12 @@ void KqpNode::NotEnoughComputeActors() {
}
AssertResourceBrokerSensors(0, 0, 0, 4, 0);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::ResourceBrokerNotEnoughResources() {
@@ -570,6 +597,12 @@ void KqpNode::ResourceBrokerNotEnoughResources() {
}
AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 1, 2);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::ResourceBrokerNotEnoughResources_Extra() {
@@ -590,12 +623,17 @@ void KqpNode::ResourceBrokerNotEnoughResources_Extra() {
{
NKikimr::TActorSystemStub stub;
- auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
- bool allocated = task1ExtraAlloc((ui64)1, 1, 2 * (6000 * 2 + 1000 / 2));
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(2 * (6000 * 2 + 1000 / 2));
UNIT_ASSERT(!allocated);
}
AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::ExecuterLost() {
@@ -609,8 +647,7 @@ void KqpNode::ExecuterLost() {
{
NKikimr::TActorSystemStub stub;
- auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
- bool allocated = task1ExtraAlloc((ui64)1, 1, 100);
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100);
UNIT_ASSERT(allocated);
DispatchKqpNodePostponedEvents(sender1);
}
@@ -632,6 +669,12 @@ void KqpNode::ExecuterLost() {
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
AssertResourceBrokerSensors(0, 0, 0, 3, 0);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
void KqpNode::TerminateTx() {
@@ -649,8 +692,7 @@ void KqpNode::TerminateTx() {
{
NKikimr::TActorSystemStub stub;
- auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
- bool allocated = task1ExtraAlloc((ui64)1, 1, 100);
+ bool allocated = CompFactory->Task2Actor[1].MemoryLimits.MemoryQuotaManager->AllocateQuota(100);
UNIT_ASSERT(allocated);
DispatchKqpNodePostponedEvents(sender1);
}
@@ -683,6 +725,12 @@ void KqpNode::TerminateTx() {
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
AssertResourceBrokerSensors(0, 0, 0, 4, 0);
+
+ {
+ NKikimr::TActorSystemStub stub;
+
+ CompFactory->Task2Actor.clear();
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 40bce04634e..5da25141ad8 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -232,8 +232,69 @@ struct TComputeRuntimeSettings {
TMaybe<NDqProto::TRlPath> RlPath;
};
-using TAllocateMemoryCallback = std::function<bool(const TTxId& txId, ui64 taskId, ui64 memory)>;
-using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, ui64 memory)>;
+struct IMemoryQuotaManager {
+ using TPtr = std::shared_ptr<IMemoryQuotaManager>;
+ using TWeakPtr = std::weak_ptr<IMemoryQuotaManager>;
+ virtual ~IMemoryQuotaManager() = default;
+ virtual bool AllocateQuota(ui64 memorySize) = 0;
+ virtual void FreeQuota(ui64 memorySize) = 0;
+ virtual ui64 GetCurrentQuota() const = 0;
+};
+
+struct TGuaranteeQuotaManager : public IMemoryQuotaManager {
+
+ TGuaranteeQuotaManager(ui64 limit, ui64 guarantee, ui64 step = 1_MB, ui64 quota = 0)
+ : Limit(limit), Guarantee(guarantee), Step(step), Quota(quota) {
+ Y_VERIFY(Limit >= Guarantee);
+ Y_VERIFY(Limit >= Quota);
+ Y_VERIFY((Step ^ ~Step) + 1 == 0);
+ }
+
+ bool AllocateQuota(ui64 memorySize) override {
+ if (Quota + memorySize > Limit) {
+ ui64 delta = Quota + memorySize - Limit;
+ ui64 alignMask = Step - 1;
+ delta = (delta + alignMask) & ~alignMask;
+
+ if (!AllocateExtraQuota(delta)) {
+ return false;
+ }
+
+ Limit += delta;
+ }
+
+ Quota += memorySize;
+ return true;
+ }
+
+ void FreeQuota(ui64 memorySize) override {
+ Y_VERIFY(Quota >= memorySize);
+ Quota -= memorySize;
+ ui64 delta = Limit - std::max(Quota, Guarantee);
+ if (delta >= Step) {
+ ui64 alignMask = Step - 1;
+ delta &= ~alignMask;
+ FreeExtraQuota(delta);
+ Limit -= delta;
+ }
+ }
+
+ ui64 GetCurrentQuota() const override {
+ return Quota;
+ }
+
+ virtual bool AllocateExtraQuota(ui64) {
+ return false;
+ }
+
+ virtual void FreeExtraQuota(ui64) {
+ }
+
+ ui64 Limit;
+ ui64 Guarantee;
+ ui64 Step;
+ ui64 Quota;
+};
struct TComputeMemoryLimits {
ui64 ChannelBufferSize = 0;
@@ -241,10 +302,10 @@ struct TComputeMemoryLimits {
ui64 MkqlHeavyProgramMemoryLimit = 0; // Limit for heavy program.
ui64 MkqlProgramHardMemoryLimit = 0; // Limit that stops program execution if reached.
- TAllocateMemoryCallback AllocateMemoryFn = nullptr;
- TFreeMemoryCallback FreeMemoryFn = nullptr;
ui64 MinMemAllocSize = 30_MB;
ui64 MinMemFreeSize = 30_MB;
+
+ IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
using TTaskRunnerFactory = std::function<
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 6fb88ba70d7..1fb29ed6abe 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -184,7 +184,7 @@ protected:
, Task(std::move(task))
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
- , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
+ , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0)
, AsyncIoFactory(std::move(asyncIoFactory))
, FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
@@ -218,7 +218,7 @@ protected:
, Task(task)
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
- , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
+ , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0)
, AsyncIoFactory(std::move(asyncIoFactory))
, FunctionRegistry(functionRegistry)
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
@@ -323,22 +323,29 @@ protected:
}
void DoExecute() {
- auto guard = BindAllocator();
- auto* alloc = guard.GetMutex();
+ {
+ auto guard = BindAllocator();
+ auto* alloc = guard.GetMutex();
- if (State == NDqProto::COMPUTE_STATE_FINISHED) {
- if (!DoHandleChannelsAfterFinishImpl()) {
- return;
+ if (State == NDqProto::COMPUTE_STATE_FINISHED) {
+ if (!DoHandleChannelsAfterFinishImpl()) {
+ return;
+ }
+ } else {
+ DoExecuteImpl();
}
- } else {
- DoExecuteImpl();
- }
- if (MemoryQuota) {
- MemoryQuota->TryShrinkMemory(alloc);
- }
+ if (MemoryQuota) {
+ MemoryQuota->TryShrinkMemory(alloc);
+ }
- ReportStats(TInstant::Now());
+ ReportStats(TInstant::Now());
+ }
+ if (Terminated) {
+ TaskRunner.Reset();
+ MemoryQuota.Reset();
+ MemoryLimits.MemoryQuotaManager.reset();
+ }
}
virtual void DoExecuteImpl() {
@@ -545,10 +552,6 @@ protected:
}
}
- if (RuntimeSettings.TerminateHandler) {
- RuntimeSettings.TerminateHandler(success, issues);
- }
-
{
if (guard) {
// free MKQL memory then destroy TaskRunner and Allocator
@@ -564,8 +567,12 @@ protected:
}
}
+ if (RuntimeSettings.TerminateHandler) {
+ RuntimeSettings.TerminateHandler(success, issues);
+ }
+
this->PassAway();
- MemoryQuota = nullptr;
+ Terminated = true;
}
void Terminate(bool success, const TString& message) {
@@ -2094,7 +2101,7 @@ protected:
const NDqProto::TDqTask Task;
TString LogPrefix;
const TComputeRuntimeSettings RuntimeSettings;
- const TComputeMemoryLimits MemoryLimits;
+ TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
const IDqAsyncIoFactory::TPtr AsyncIoFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
@@ -2137,6 +2144,7 @@ private:
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
+ bool Terminated = false;
protected:
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
index 8bff90633b0..b7604b2b2ef 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
@@ -36,9 +36,11 @@ namespace NYql::NDq {
, ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr)
, CanAllocateExtraMemory(canAllocateExtraMemory)
, ActorSystem(actorSystem) {
- if (MkqlMemoryQuota) {
- MkqlMemoryQuota->Add(MkqlMemoryLimit);
- }
+
+ Y_VERIFY(MemoryLimits.MemoryQuotaManager->AllocateQuota(MkqlMemoryLimit));
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Add(MkqlMemoryLimit);
+ }
}
ui64 GetMkqlMemoryLimit() const {
@@ -56,18 +58,16 @@ namespace NYql::NDq {
void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) {
alloc->ReleaseFreePages();
- if (MemoryLimits.FreeMemoryFn) {
- auto newLimit = std::max(alloc->GetAllocated(), InitialMkqlMemoryLimit);
- if (MkqlMemoryLimit > newLimit) {
- auto freedSize = MkqlMemoryLimit - newLimit;
- MkqlMemoryLimit = newLimit;
- alloc->SetLimit(newLimit);
- MemoryLimits.FreeMemoryFn(TxId, TaskId, freedSize);
- if (MkqlMemoryQuota) {
- MkqlMemoryQuota->Sub(freedSize);
- }
- CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit);
+ auto newLimit = std::max(alloc->GetAllocated(), InitialMkqlMemoryLimit);
+ if (MkqlMemoryLimit > newLimit) {
+ auto freedSize = MkqlMemoryLimit - newLimit;
+ MkqlMemoryLimit = newLimit;
+ alloc->SetLimit(newLimit);
+ MemoryLimits.MemoryQuotaManager->FreeQuota(freedSize);
+ if (MkqlMemoryQuota) {
+ MkqlMemoryQuota->Sub(freedSize);
}
+ CAMQ_LOG_D("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit);
}
}
@@ -94,8 +94,8 @@ namespace NYql::NDq {
}
void TryReleaseQuota() {
- if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) {
- MemoryLimits.FreeMemoryFn(TxId, TaskId, MkqlMemoryLimit);
+ if (MkqlMemoryLimit) {
+ MemoryLimits.MemoryQuotaManager->FreeQuota(MkqlMemoryLimit);
if (MkqlMemoryQuota) {
MkqlMemoryQuota->Sub(MkqlMemoryLimit);
}
@@ -119,7 +119,7 @@ namespace NYql::NDq {
throw THardMemoryLimitException();
}
- if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) {
+ if (MemoryLimits.MemoryQuotaManager->AllocateQuota(memory)) {
MkqlMemoryLimit += memory;
if (MkqlMemoryQuota) {
MkqlMemoryQuota->Add(memory);
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 0508c9a964c..0c8e03e2854 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -16,8 +16,7 @@ using namespace NDqs;
IActor* CreateComputeActor(
const TLocalWorkerManagerOptions& options,
- NDq::TAllocateMemoryCallback allocateMemoryFn,
- NDq::TFreeMemoryCallback freeMemoryFn,
+ NDq::IMemoryQuotaManager::TPtr memoryQuotaManager,
const TActorId& executerId,
const TString& operationId,
NYql::NDqProto::TDqTask&& task,
@@ -31,8 +30,7 @@ IActor* CreateComputeActor(
memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit;
memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit;
memoryLimits.MkqlProgramHardMemoryLimit = options.MkqlProgramHardMemoryLimit;
- memoryLimits.AllocateMemoryFn = allocateMemoryFn;
- memoryLimits.FreeMemoryFn = freeMemoryFn;
+ memoryLimits.MemoryQuotaManager = memoryQuotaManager;
// min alloc size == min free size to simplify api
memoryLimits.MinMemAllocSize = options.MkqlMinAllocSize;
memoryLimits.MinMemFreeSize = options.MkqlMinAllocSize;
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h
index b0eaf4f6880..ea4c691bb8d 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.h
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.h
@@ -7,8 +7,7 @@ namespace NYql {
NActors::IActor* CreateComputeActor(
const NYql::NDqs::TLocalWorkerManagerOptions& options,
- NDq::TAllocateMemoryCallback allocateMemoryFn,
- NDq::TFreeMemoryCallback freeMemoryFn,
+ NDq::IMemoryQuotaManager::TPtr memoryQuotaManager,
const NActors::TActorId& executerId,
const TString& operationId,
NYql::NDqProto::TDqTask&& task,
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 7da4c641583..13ffc497ee4 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/yql/providers/dq/actors/compute_actor.h>
#include <ydb/library/yql/providers/dq/actors/worker_actor.h>
#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
#include <ydb/library/yql/dq/common/dq_resource_quoter.h>
@@ -37,6 +38,32 @@ union TDqLocalResourceId {
static_assert(sizeof(TDqLocalResourceId) == 8);
+struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
+
+ TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit)
+ : NYql::NDq::TGuaranteeQuotaManager(limit, limit)
+ , NodeQuoter(nodeQuoter)
+ , TxId(txId) {
+ }
+
+ ~TMemoryQuotaManager() override {
+ if (Limit) {
+ NodeQuoter->Free(TxId, 0, Limit);
+ }
+ }
+
+ bool AllocateExtraQuota(ui64 extraSize) override {
+ return NodeQuoter->Allocate(TxId, 0, extraSize);
+ }
+
+ void FreeExtraQuota(ui64 extraSize) override {
+ NodeQuoter->Free(TxId, 0, extraSize);
+ }
+
+ std::shared_ptr<NDq::TResourceQuoter> NodeQuoter;
+ NDq::TTxId TxId;
+};
+
class TLocalWorkerManager: public TWorkerManagerCommon<TLocalWorkerManager> {
public:
@@ -54,16 +81,6 @@ public:
limitCounter->Set(limit);
allocatedCounter->Set(allocated);
});
-
- AllocateMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) {
- // mem per task is not tracked yet
- return quoter->Allocate(txId, 0, size);
- };
-
- FreeMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) {
- // mem per task is not tracked yet
- quoter->Free(txId, 0, size);
- };
}
private:
@@ -218,13 +235,16 @@ private:
auto& tasks = *ev->Get()->Record.MutableTask();
ui64 totalInitialTaskMemoryLimit = 0;
+ std::vector<ui64> quotas;
if (createComputeActor) {
Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count));
+ quotas.reserve(count);
for (auto& task : tasks) {
auto taskLimit = task.GetInitialTaskMemoryLimit();
if (taskLimit == 0) {
taskLimit = Options.MkqlInitialMemoryLimit;
}
+ quotas.push_back(taskLimit);
totalInitialTaskMemoryLimit += taskLimit;
}
} else {
@@ -268,8 +288,7 @@ private:
actor.Reset(NYql::CreateComputeActor(
Options,
- Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr,
- Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr,
+ std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]),
resultId,
traceId,
std::move(tasks[i]),
@@ -322,8 +341,6 @@ private:
YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId;
}
- MemoryQuoter->Free(it->second.TxId, 0);
-
auto traceId = std::get<TString>(it->second.TxId);
auto itt = TaskCountersMap.find(traceId);
if (itt != TaskCountersMap.end()) {
@@ -369,8 +386,6 @@ private:
TRusage Rusage;
- NDq::TAllocateMemoryCallback AllocateMemoryFn;
- NDq::TFreeMemoryCallback FreeMemoryFn;
std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter;
struct TCountersInfo {