summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <[email protected]>2022-09-29 13:33:03 +0300
committergalaxycrab <[email protected]>2022-09-29 13:33:03 +0300
commit8421b01c80a333f52c006599b32b54b6a0a0bd0d (patch)
treee82821c7dad452c7d7e427d64dd50ebb02e04b3e
parent58bf216854fa84160309b5484114125d95bd4e26 (diff)
Hard limit on memory for one task
-rw-r--r--ydb/core/yq/libs/config/protos/resource_manager.proto9
-rw-r--r--ydb/core/yq/libs/init/init.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h18
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp16
-rw-r--r--ydb/library/yql/minikql/aligned_page_pool.h5
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp1
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h1
8 files changed, 41 insertions, 15 deletions
diff --git a/ydb/core/yq/libs/config/protos/resource_manager.proto b/ydb/core/yq/libs/config/protos/resource_manager.proto
index b25ef580bd6..1890e5082dc 100644
--- a/ydb/core/yq/libs/config/protos/resource_manager.proto
+++ b/ydb/core/yq/libs/config/protos/resource_manager.proto
@@ -7,8 +7,9 @@ option java_package = "ru.yandex.kikimr.proto";
////////////////////////////////////////////////////////////
message TResourceManagerConfig {
- bool Enabled = 1;
- uint64 MkqlInitialMemoryLimit = 2; // per task, default: 8_GB
- uint64 MkqlTotalMemoryLimit = 3; // per node, default: 0, means no limit management
- uint64 MkqlAllocSize = 4; // min alloc/free, default 30_MB
+ bool Enabled = 1;
+ uint64 MkqlInitialMemoryLimit = 2; // per task, default: 8_GB
+ uint64 MkqlTotalMemoryLimit = 3; // per node, default: 0, means no limit management
+ uint64 MkqlAllocSize = 4; // min alloc/free, default 30_MB
+ uint64 MkqlTaskHardMemoryLimit = 5; // per task. if it is exceeded, graph will be stoped
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 731e0dc5c7a..cca3261fec4 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -189,6 +189,7 @@ void Init(
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit;
lwmOptions.MkqlTotalMemoryLimit = mkqlTotalMemoryLimit;
+ lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](const NYql::NDqProto::TDqTask& task, const NYql::NDq::TLogFunc&) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index dd2152f9e76..44394ccd47f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -238,8 +238,9 @@ using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, u
struct TComputeMemoryLimits {
ui64 ChannelBufferSize = 0;
- ui64 MkqlLightProgramMemoryLimit = 0;
- ui64 MkqlHeavyProgramMemoryLimit = 0;
+ ui64 MkqlLightProgramMemoryLimit = 0; // Limit for light program.
+ ui64 MkqlHeavyProgramMemoryLimit = 0; // Limit for heavy program.
+ ui64 MkqlProgramHardMemoryLimit = 0; // Limit that stops program execution if reached.
TAllocateMemoryCallback AllocateMemoryFn = nullptr;
TFreeMemoryCallback FreeMemoryFn = nullptr;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
index 86d4f979f65..f8c94daab63 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
@@ -4,6 +4,7 @@
#include <ydb/core/protos/services.pb.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/aligned_page_pool.h>
#include <library/cpp/actors/core/log.h>
@@ -20,6 +21,9 @@ namespace NYql::NDq {
#define CAMQ_LOG_W(s) \
LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", task: " << TaskId << ". " << s)
+ class THardMemoryLimitException : public NKikimr::TMemoryLimitExceededException {
+ };
+
class TDqMemoryQuota {
public:
TDqMemoryQuota(ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem)
@@ -29,7 +33,7 @@ namespace NYql::NDq {
, TxId(txId)
, TaskId(taskId)
, ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr)
- , CanAllocateExtraMemory(canAllocateExtraMemory)
+ , CanAllocateExtraMemory(canAllocateExtraMemory)
, ActorSystem(actorSystem) {
}
@@ -93,10 +97,18 @@ namespace NYql::NDq {
return CanAllocateExtraMemory;
}
+ ui64 GetHardMemoryLimit() const {
+ return MemoryLimits.MkqlProgramHardMemoryLimit;
+ }
+
private:
void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) {
memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize);
+ if (MemoryLimits.MkqlProgramHardMemoryLimit && MkqlMemoryLimit + memory > MemoryLimits.MkqlProgramHardMemoryLimit) {
+ throw THardMemoryLimitException();
+ }
+
if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) {
MkqlMemoryLimit += memory;
CAMQ_LOG_D("[Mem] memory " << memory << " granted, new limit: " << MkqlMemoryLimit);
@@ -112,7 +124,7 @@ namespace NYql::NDq {
ProfileStats->MkqlExtraMemoryRequests++;
}
}
-
+
ui64 AlignMemorySizeToMbBoundary(ui64 memory) {
// allocate memory in 1_MB (2^20B) chunks, so requested value is rounded up to MB boundary
constexpr ui64 alignMask = 1_MB - 1;
@@ -129,4 +141,4 @@ namespace NYql::NDq {
const bool CanAllocateExtraMemory;
NActors::TActorSystem* ActorSystem;
};
-} // namespace NYql::NDq \ No newline at end of file
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 7ac6dafaa6b..8f235feabf7 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -421,14 +421,20 @@ private:
ev->Cookie);
}
- THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) {
- const TString err = TStringBuilder() << "Mkql memory limit exceeded"
- << ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1)
- << ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0);
+ THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException& e) {
+ const bool isHardLimit = dynamic_cast<const THardMemoryLimitException*>(&e) != nullptr;
+ TStringBuilder err;
+ err << "Mkql memory limit exceeded";
+ if (isHardLimit) {
+ err << ", hard limit: " << MemoryQuota->GetHardMemoryLimit();
+ } else {
+ err << ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1)
+ << ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0);
+ }
LOG_E("TMemoryLimitExceededException: " << err);
TIssue issue(err);
SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue);
- return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue});
+ return MakeHolder<TEvDq::TEvAbortExecution>(isHardLimit ? NYql::NDqProto::StatusIds::LIMIT_EXCEEDED : NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue});
}
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
diff --git a/ydb/library/yql/minikql/aligned_page_pool.h b/ydb/library/yql/minikql/aligned_page_pool.h
index ffef6f94083..93e4a1bd8b6 100644
--- a/ydb/library/yql/minikql/aligned_page_pool.h
+++ b/ydb/library/yql/minikql/aligned_page_pool.h
@@ -33,7 +33,10 @@ struct TAlignedPagePoolCounters {
// NOTE: We intentionally avoid inheritance from std::exception here to make it harder
// to catch this exception in UDFs code, so we can handle it in the host.
-class TMemoryLimitExceededException {};
+class TMemoryLimitExceededException {
+public:
+ virtual ~TMemoryLimitExceededException() = default;
+};
class TAlignedPagePool {
public:
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index f5742a492e3..64faded43b2 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -30,6 +30,7 @@ IActor* CreateComputeActor(
// light == heavy since we allow extra allocation
memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit;
memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit;
+ memoryLimits.MkqlProgramHardMemoryLimit = options.MkqlProgramHardMemoryLimit;
memoryLimits.AllocateMemoryFn = allocateMemoryFn;
memoryLimits.FreeMemoryFn = freeMemoryFn;
// min alloc size == min free size to simplify api
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index a772135afa0..f49e2c01ac5 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -34,6 +34,7 @@ namespace NYql::NDqs {
ui64 MkqlInitialMemoryLimit = 8_GB;
ui64 MkqlTotalMemoryLimit = 0;
ui64 MkqlMinAllocSize = 30_MB;
+ ui64 MkqlProgramHardMemoryLimit = 0;
bool CanUseComputeActor = true;
NActors::TActorId QuoterServiceActorId;