diff options
| author | galaxycrab <[email protected]> | 2022-09-29 13:33:03 +0300 |
|---|---|---|
| committer | galaxycrab <[email protected]> | 2022-09-29 13:33:03 +0300 |
| commit | 8421b01c80a333f52c006599b32b54b6a0a0bd0d (patch) | |
| tree | e82821c7dad452c7d7e427d64dd50ebb02e04b3e | |
| parent | 58bf216854fa84160309b5484114125d95bd4e26 (diff) | |
Hard limit on memory for one task
8 files changed, 41 insertions, 15 deletions
diff --git a/ydb/core/yq/libs/config/protos/resource_manager.proto b/ydb/core/yq/libs/config/protos/resource_manager.proto index b25ef580bd6..1890e5082dc 100644 --- a/ydb/core/yq/libs/config/protos/resource_manager.proto +++ b/ydb/core/yq/libs/config/protos/resource_manager.proto @@ -7,8 +7,9 @@ option java_package = "ru.yandex.kikimr.proto"; //////////////////////////////////////////////////////////// message TResourceManagerConfig { - bool Enabled = 1; - uint64 MkqlInitialMemoryLimit = 2; // per task, default: 8_GB - uint64 MkqlTotalMemoryLimit = 3; // per node, default: 0, means no limit management - uint64 MkqlAllocSize = 4; // min alloc/free, default 30_MB + bool Enabled = 1; + uint64 MkqlInitialMemoryLimit = 2; // per task, default: 8_GB + uint64 MkqlTotalMemoryLimit = 3; // per node, default: 0, means no limit management + uint64 MkqlAllocSize = 4; // min alloc/free, default 30_MB + uint64 MkqlTaskHardMemoryLimit = 5; // per task. if it is exceeded, graph will be stoped } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 731e0dc5c7a..cca3261fec4 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -189,6 +189,7 @@ void Init( lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory(); lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit; lwmOptions.MkqlTotalMemoryLimit = mkqlTotalMemoryLimit; + lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit(); lwmOptions.MkqlMinAllocSize = mkqlAllocSize; lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( [=](const NYql::NDqProto::TDqTask& task, const NYql::NDq::TLogFunc&) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index dd2152f9e76..44394ccd47f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -238,8 +238,9 @@ using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, u struct TComputeMemoryLimits { ui64 ChannelBufferSize = 0; - ui64 MkqlLightProgramMemoryLimit = 0; - ui64 MkqlHeavyProgramMemoryLimit = 0; + ui64 MkqlLightProgramMemoryLimit = 0; // Limit for light program. + ui64 MkqlHeavyProgramMemoryLimit = 0; // Limit for heavy program. + ui64 MkqlProgramHardMemoryLimit = 0; // Limit that stops program execution if reached. TAllocateMemoryCallback AllocateMemoryFn = nullptr; TFreeMemoryCallback FreeMemoryFn = nullptr; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h index 86d4f979f65..f8c94daab63 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h @@ -4,6 +4,7 @@ #include <ydb/core/protos/services.pb.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/aligned_page_pool.h> #include <library/cpp/actors/core/log.h> @@ -20,6 +21,9 @@ namespace NYql::NDq { #define CAMQ_LOG_W(s) \ LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", task: " << TaskId << ". " << s) + class THardMemoryLimitException : public NKikimr::TMemoryLimitExceededException { + }; + class TDqMemoryQuota { public: TDqMemoryQuota(ui64 initialMkqlMemoryLimit, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NYql::NDq::TTxId txId, ui64 taskId, bool profileStats, bool canAllocateExtraMemory, NActors::TActorSystem* actorSystem) @@ -29,7 +33,7 @@ namespace NYql::NDq { , TxId(txId) , TaskId(taskId) , ProfileStats(profileStats ? MakeHolder<TProfileStats>() : nullptr) - , CanAllocateExtraMemory(canAllocateExtraMemory) + , CanAllocateExtraMemory(canAllocateExtraMemory) , ActorSystem(actorSystem) { } @@ -93,10 +97,18 @@ namespace NYql::NDq { return CanAllocateExtraMemory; } + ui64 GetHardMemoryLimit() const { + return MemoryLimits.MkqlProgramHardMemoryLimit; + } + private: void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) { memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize); + if (MemoryLimits.MkqlProgramHardMemoryLimit && MkqlMemoryLimit + memory > MemoryLimits.MkqlProgramHardMemoryLimit) { + throw THardMemoryLimitException(); + } + if (MemoryLimits.AllocateMemoryFn(TxId, TaskId, memory)) { MkqlMemoryLimit += memory; CAMQ_LOG_D("[Mem] memory " << memory << " granted, new limit: " << MkqlMemoryLimit); @@ -112,7 +124,7 @@ namespace NYql::NDq { ProfileStats->MkqlExtraMemoryRequests++; } } - + ui64 AlignMemorySizeToMbBoundary(ui64 memory) { // allocate memory in 1_MB (2^20B) chunks, so requested value is rounded up to MB boundary constexpr ui64 alignMask = 1_MB - 1; @@ -129,4 +141,4 @@ namespace NYql::NDq { const bool CanAllocateExtraMemory; NActors::TActorSystem* ActorSystem; }; -} // namespace NYql::NDq
\ No newline at end of file +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 7ac6dafaa6b..8f235feabf7 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -421,14 +421,20 @@ private: ev->Cookie); } - THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) { - const TString err = TStringBuilder() << "Mkql memory limit exceeded" - << ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1) - << ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0); + THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException& e) { + const bool isHardLimit = dynamic_cast<const THardMemoryLimitException*>(&e) != nullptr; + TStringBuilder err; + err << "Mkql memory limit exceeded"; + if (isHardLimit) { + err << ", hard limit: " << MemoryQuota->GetHardMemoryLimit(); + } else { + err << ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1) + << ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0); + } LOG_E("TMemoryLimitExceededException: " << err); TIssue issue(err); SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue); - return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue}); + return MakeHolder<TEvDq::TEvAbortExecution>(isHardLimit ? NYql::NDqProto::StatusIds::LIMIT_EXCEEDED : NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue}); } THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { diff --git a/ydb/library/yql/minikql/aligned_page_pool.h b/ydb/library/yql/minikql/aligned_page_pool.h index ffef6f94083..93e4a1bd8b6 100644 --- a/ydb/library/yql/minikql/aligned_page_pool.h +++ b/ydb/library/yql/minikql/aligned_page_pool.h @@ -33,7 +33,10 @@ struct TAlignedPagePoolCounters { // NOTE: We intentionally avoid inheritance from std::exception here to make it harder // to catch this exception in UDFs code, so we can handle it in the host. -class TMemoryLimitExceededException {}; +class TMemoryLimitExceededException { +public: + virtual ~TMemoryLimitExceededException() = default; +}; class TAlignedPagePool { public: diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index f5742a492e3..64faded43b2 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -30,6 +30,7 @@ IActor* CreateComputeActor( // light == heavy since we allow extra allocation memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit; memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit; + memoryLimits.MkqlProgramHardMemoryLimit = options.MkqlProgramHardMemoryLimit; memoryLimits.AllocateMemoryFn = allocateMemoryFn; memoryLimits.FreeMemoryFn = freeMemoryFn; // min alloc size == min free size to simplify api diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index a772135afa0..f49e2c01ac5 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -34,6 +34,7 @@ namespace NYql::NDqs { ui64 MkqlInitialMemoryLimit = 8_GB; ui64 MkqlTotalMemoryLimit = 0; ui64 MkqlMinAllocSize = 30_MB; + ui64 MkqlProgramHardMemoryLimit = 0; bool CanUseComputeActor = true; NActors::TActorId QuoterServiceActorId; |
