diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-13 13:59:42 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-13 13:59:42 +0300 |
commit | d74ccea9e86ca97c9960a4630366e69097999b07 (patch) | |
tree | 7ae9538188ffebbe7a4d8328df7217489e645842 | |
parent | 98391f41335ba5fea7d080215bd3f0c6905248b4 (diff) | |
download | ydb-d74ccea9e86ca97c9960a4630366e69097999b07.tar.gz |
correct planning policy. use maximal cpu with memory limits
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp | 173 |
2 files changed, 114 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index ba19402fd22..5567d314f81 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -21,6 +21,7 @@ using namespace NYql; // Task can allocate extra memory during execution. // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; +constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks, THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, @@ -95,8 +96,10 @@ void TKqpPlanner::Process() { PrepareToProcess(); auto localResources = GetKqpResourceManager()->GetLocalResources(); + Y_UNUSED(MEMORY_ESTIMATION_OVERFLOW); if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && - ResourceEstimations.size() <= localResources.ExecutionUnits) + ResourceEstimations.size() <= localResources.ExecutionUnits && + ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT) { RunLocal(ResourcesSnapshot); return; diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp index 07c1457cdec..5cd5bced3d2 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp @@ -17,47 +17,112 @@ using namespace NActors; // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr float TASK_MEMORY_ESTIMATION_OVERFLOW = 1.2f; -class TKqpGreedyPlanner : public IKqpPlannerStrategy { +class TNodesManager { public: - ~TKqpGreedyPlanner() override {} - - TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources, - const TVector<TTaskResourceEstimation>& tasks) override - { - TVector<TResult> result; + struct TNodeDesc { + ui32 NodeId = std::numeric_limits<ui32>::max(); + TActorId ResourceManagerId; + ui64 RemainsMemory = 0; + ui32 RemainsComputeActors = 0; + TVector<ui64> Tasks; + bool operator < (const TNodeDesc& item) const { + return std::tuple(-(i32)Tasks.size(), RemainsMemory, RemainsComputeActors) + < std::tuple(-(i32)item.Tasks.size(), item.RemainsMemory, item.RemainsComputeActors); + } - struct TNodeDesc { - ui32 NodeId = std::numeric_limits<ui32>::max(); - TActorId ResourceManagerId; - ui64 RemainsMemory = 0; - ui32 RemainsComputeActors = 0; - TVector<ui64> Tasks; - }; - - struct TComp { - bool operator ()(const TNodeDesc& l, const TNodeDesc& r) { - return r.RemainsMemory > l.RemainsMemory && l.RemainsComputeActors > 0; + std::optional<IKqpPlannerStrategy::TResult> BuildResult() { + if (Tasks.empty()) { + return {}; } - }; + IKqpPlannerStrategy::TResult item; + item.NodeId = NodeId; + item.ResourceManager = ResourceManagerId; + item.TaskIds.swap(Tasks); + return item; + } + }; +private: + std::vector<TNodeDesc> Nodes; +public: + std::vector<TNodeDesc>::iterator begin() { + return Nodes.begin(); + } + std::vector<TNodeDesc>::iterator end() { + return Nodes.end(); + } + + std::optional<TNodeDesc> PopNode() { + if (Nodes.empty()) { + return {}; + } + std::pop_heap(Nodes.begin(), Nodes.end()); + auto result = std::move(Nodes.back()); + Nodes.pop_back(); + return result; + } - TPriorityQueue<TNodeDesc, TVector<TNodeDesc>, TComp> nodes{TComp()}; + void PushNode(TNodeDesc&& node) { + Nodes.emplace_back(std::move(node)); + std::push_heap(Nodes.begin(), Nodes.end()); + } + + std::optional<TNodeDesc> PopOptimalNodeWithLimits(const ui64 memoryLimit, const ui32 actorsLimit) { + std::vector<TNodeDesc> localNodesWithNotEnoughResources; + std::optional<TNodeDesc> result; + while (true) { + if (Nodes.empty()) { + break; + } + std::pop_heap(Nodes.begin(), Nodes.end()); + if (Nodes.back().RemainsComputeActors >= actorsLimit && Nodes.back().RemainsMemory >= memoryLimit) { + result = std::move(Nodes.back()); + Nodes.pop_back(); + break; + } else { + localNodesWithNotEnoughResources.emplace_back(std::move(Nodes.back())); + Nodes.pop_back(); + } + } + for (auto&& i : localNodesWithNotEnoughResources) { + Nodes.emplace_back(std::move(i)); + std::push_heap(Nodes.begin(), Nodes.end()); + } + return result; + } + TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) { for (auto& node : nodeResources) { - nodes.emplace(TNodeDesc{ + if (!node.GetAvailableComputeActors()) { + continue; + } + Nodes.emplace_back(TNodeDesc{ node.GetNodeId(), ActorIdFromProto(node.GetResourceManagerActorId()), node.GetTotalMemory() - node.GetUsedMemory(), node.GetAvailableComputeActors(), {} - }); + }); + } + std::make_heap(Nodes.begin(), Nodes.end()); + } +}; +class TKqpGreedyPlanner : public IKqpPlannerStrategy { +public: + ~TKqpGreedyPlanner() override {} + + TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources, + const TVector<TTaskResourceEstimation>& tasks) override + { + TVector<TResult> result; + TNodesManager nodes(nodeResources); + for (auto& node : nodeResources) { if (LogFunc) { LogFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId() << " memory: " << (node.GetTotalMemory() - node.GetUsedMemory()) << ", ca: " << node.GetAvailableComputeActors()); } } - if (LogFunc) { for (const auto& task : tasks) { LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit); @@ -65,57 +130,39 @@ public: } for (const auto& taskEstimation : tasks) { - TNodeDesc node = nodes.top(); - - if (node.RemainsComputeActors > 0 && - node.RemainsMemory > taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW) - { - nodes.pop(); - node.RemainsComputeActors--; - node.RemainsMemory -= taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW; - node.Tasks.push_back(taskEstimation.TaskId); - - if (LogFunc) { - LogFunc(TStringBuilder() << "Schedule task: " << taskEstimation.TaskId - << " (" << taskEstimation.TotalMemoryLimit << " bytes) " - << "to node #" << node.NodeId << ". " - << "Remains memory: " << node.RemainsMemory << ", ca: " << node.RemainsComputeActors); - } - - nodes.emplace(std::move(node)); - } else { + auto node = nodes.PopOptimalNodeWithLimits(taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW, 1); + if (!node) { if (LogFunc) { TStringBuilder err; err << "Not enough resources to execute query. Task " << taskEstimation.TaskId - << " (" << taskEstimation.TotalMemoryLimit << " bytes) " - << "Node: " << node.NodeId << ", remains memory: " << node.RemainsMemory - << ", ca: " << node.RemainsComputeActors; + << " (" << taskEstimation.TotalMemoryLimit << " bytes) "; LogFunc(err); } - return result; + } else { + if (LogFunc) { + LogFunc(TStringBuilder() << "Schedule task: " << taskEstimation.TaskId + << " (" << taskEstimation.TotalMemoryLimit << " bytes) " + << "to node #" << node->NodeId << ". " + << "Remains memory: " << node->RemainsMemory << ", ca: " << node->RemainsComputeActors); + } + node->RemainsMemory -= taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW; + node->Tasks.emplace_back(taskEstimation.TaskId); + --node->RemainsComputeActors; + nodes.PushNode(std::move(*node)); } } - while (!nodes.empty()) { - TNodeDesc node = nodes.top(); - nodes.pop(); - - if (node.Tasks.empty()) { - continue; - } - - if (LogFunc) { - LogFunc(TStringBuilder() << "About to execute tasks [" << JoinSeq(", ", node.Tasks) << "]" - << " on node " << node.NodeId); + while (auto node = nodes.PopNode()) { + auto resultNode = node->BuildResult(); + if (resultNode) { + if (LogFunc) { + LogFunc(TStringBuilder() << "About to execute tasks [" << JoinSeq(", ", resultNode->TaskIds) << "]" + << " on node " << resultNode->NodeId); + } + result.emplace_back(std::move(*resultNode)); } - - result.push_back({}); - TResult& item = result.back(); - item.NodeId = node.NodeId; - item.ResourceManager = node.ResourceManagerId; - item.TaskIds.swap(node.Tasks); } return result; |