aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-02-13 13:59:42 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-02-13 13:59:42 +0300
commitd74ccea9e86ca97c9960a4630366e69097999b07 (patch)
tree7ae9538188ffebbe7a4d8328df7217489e645842
parent98391f41335ba5fea7d080215bd3f0c6905248b4 (diff)
downloadydb-d74ccea9e86ca97c9960a4630366e69097999b07.tar.gz
correct planning policy. use maximal cpu with memory limits
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp173
2 files changed, 114 insertions, 64 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index ba19402fd22..5567d314f81 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -21,6 +21,7 @@ using namespace NYql;
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
+constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
TKqpPlanner::TKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::TDqTask>&& computeTasks,
THashMap<ui64, TVector<NDqProto::TDqTask>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
@@ -95,8 +96,10 @@ void TKqpPlanner::Process() {
PrepareToProcess();
auto localResources = GetKqpResourceManager()->GetLocalResources();
+ Y_UNUSED(MEMORY_ESTIMATION_OVERFLOW);
if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
- ResourceEstimations.size() <= localResources.ExecutionUnits)
+ ResourceEstimations.size() <= localResources.ExecutionUnits &&
+ ResourceEstimations.size() <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT)
{
RunLocal(ResourcesSnapshot);
return;
diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
index 07c1457cdec..5cd5bced3d2 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
@@ -17,47 +17,112 @@ using namespace NActors;
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr float TASK_MEMORY_ESTIMATION_OVERFLOW = 1.2f;
-class TKqpGreedyPlanner : public IKqpPlannerStrategy {
+class TNodesManager {
public:
- ~TKqpGreedyPlanner() override {}
-
- TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
- const TVector<TTaskResourceEstimation>& tasks) override
- {
- TVector<TResult> result;
+ struct TNodeDesc {
+ ui32 NodeId = std::numeric_limits<ui32>::max();
+ TActorId ResourceManagerId;
+ ui64 RemainsMemory = 0;
+ ui32 RemainsComputeActors = 0;
+ TVector<ui64> Tasks;
+ bool operator < (const TNodeDesc& item) const {
+ return std::tuple(-(i32)Tasks.size(), RemainsMemory, RemainsComputeActors)
+ < std::tuple(-(i32)item.Tasks.size(), item.RemainsMemory, item.RemainsComputeActors);
+ }
- struct TNodeDesc {
- ui32 NodeId = std::numeric_limits<ui32>::max();
- TActorId ResourceManagerId;
- ui64 RemainsMemory = 0;
- ui32 RemainsComputeActors = 0;
- TVector<ui64> Tasks;
- };
-
- struct TComp {
- bool operator ()(const TNodeDesc& l, const TNodeDesc& r) {
- return r.RemainsMemory > l.RemainsMemory && l.RemainsComputeActors > 0;
+ std::optional<IKqpPlannerStrategy::TResult> BuildResult() {
+ if (Tasks.empty()) {
+ return {};
}
- };
+ IKqpPlannerStrategy::TResult item;
+ item.NodeId = NodeId;
+ item.ResourceManager = ResourceManagerId;
+ item.TaskIds.swap(Tasks);
+ return item;
+ }
+ };
+private:
+ std::vector<TNodeDesc> Nodes;
+public:
+ std::vector<TNodeDesc>::iterator begin() {
+ return Nodes.begin();
+ }
+ std::vector<TNodeDesc>::iterator end() {
+ return Nodes.end();
+ }
+
+ std::optional<TNodeDesc> PopNode() {
+ if (Nodes.empty()) {
+ return {};
+ }
+ std::pop_heap(Nodes.begin(), Nodes.end());
+ auto result = std::move(Nodes.back());
+ Nodes.pop_back();
+ return result;
+ }
- TPriorityQueue<TNodeDesc, TVector<TNodeDesc>, TComp> nodes{TComp()};
+ void PushNode(TNodeDesc&& node) {
+ Nodes.emplace_back(std::move(node));
+ std::push_heap(Nodes.begin(), Nodes.end());
+ }
+
+ std::optional<TNodeDesc> PopOptimalNodeWithLimits(const ui64 memoryLimit, const ui32 actorsLimit) {
+ std::vector<TNodeDesc> localNodesWithNotEnoughResources;
+ std::optional<TNodeDesc> result;
+ while (true) {
+ if (Nodes.empty()) {
+ break;
+ }
+ std::pop_heap(Nodes.begin(), Nodes.end());
+ if (Nodes.back().RemainsComputeActors >= actorsLimit && Nodes.back().RemainsMemory >= memoryLimit) {
+ result = std::move(Nodes.back());
+ Nodes.pop_back();
+ break;
+ } else {
+ localNodesWithNotEnoughResources.emplace_back(std::move(Nodes.back()));
+ Nodes.pop_back();
+ }
+ }
+ for (auto&& i : localNodesWithNotEnoughResources) {
+ Nodes.emplace_back(std::move(i));
+ std::push_heap(Nodes.begin(), Nodes.end());
+ }
+ return result;
+ }
+ TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
for (auto& node : nodeResources) {
- nodes.emplace(TNodeDesc{
+ if (!node.GetAvailableComputeActors()) {
+ continue;
+ }
+ Nodes.emplace_back(TNodeDesc{
node.GetNodeId(),
ActorIdFromProto(node.GetResourceManagerActorId()),
node.GetTotalMemory() - node.GetUsedMemory(),
node.GetAvailableComputeActors(),
{}
- });
+ });
+ }
+ std::make_heap(Nodes.begin(), Nodes.end());
+ }
+};
+class TKqpGreedyPlanner : public IKqpPlannerStrategy {
+public:
+ ~TKqpGreedyPlanner() override {}
+
+ TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
+ const TVector<TTaskResourceEstimation>& tasks) override
+ {
+ TVector<TResult> result;
+ TNodesManager nodes(nodeResources);
+ for (auto& node : nodeResources) {
if (LogFunc) {
LogFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId()
<< " memory: " << (node.GetTotalMemory() - node.GetUsedMemory())
<< ", ca: " << node.GetAvailableComputeActors());
}
}
-
if (LogFunc) {
for (const auto& task : tasks) {
LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit);
@@ -65,57 +130,39 @@ public:
}
for (const auto& taskEstimation : tasks) {
- TNodeDesc node = nodes.top();
-
- if (node.RemainsComputeActors > 0 &&
- node.RemainsMemory > taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW)
- {
- nodes.pop();
- node.RemainsComputeActors--;
- node.RemainsMemory -= taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW;
- node.Tasks.push_back(taskEstimation.TaskId);
-
- if (LogFunc) {
- LogFunc(TStringBuilder() << "Schedule task: " << taskEstimation.TaskId
- << " (" << taskEstimation.TotalMemoryLimit << " bytes) "
- << "to node #" << node.NodeId << ". "
- << "Remains memory: " << node.RemainsMemory << ", ca: " << node.RemainsComputeActors);
- }
-
- nodes.emplace(std::move(node));
- } else {
+ auto node = nodes.PopOptimalNodeWithLimits(taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW, 1);
+ if (!node) {
if (LogFunc) {
TStringBuilder err;
err << "Not enough resources to execute query. Task " << taskEstimation.TaskId
- << " (" << taskEstimation.TotalMemoryLimit << " bytes) "
- << "Node: " << node.NodeId << ", remains memory: " << node.RemainsMemory
- << ", ca: " << node.RemainsComputeActors;
+ << " (" << taskEstimation.TotalMemoryLimit << " bytes) ";
LogFunc(err);
}
-
return result;
+ } else {
+ if (LogFunc) {
+ LogFunc(TStringBuilder() << "Schedule task: " << taskEstimation.TaskId
+ << " (" << taskEstimation.TotalMemoryLimit << " bytes) "
+ << "to node #" << node->NodeId << ". "
+ << "Remains memory: " << node->RemainsMemory << ", ca: " << node->RemainsComputeActors);
+ }
+ node->RemainsMemory -= taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW;
+ node->Tasks.emplace_back(taskEstimation.TaskId);
+ --node->RemainsComputeActors;
+ nodes.PushNode(std::move(*node));
}
}
- while (!nodes.empty()) {
- TNodeDesc node = nodes.top();
- nodes.pop();
-
- if (node.Tasks.empty()) {
- continue;
- }
-
- if (LogFunc) {
- LogFunc(TStringBuilder() << "About to execute tasks [" << JoinSeq(", ", node.Tasks) << "]"
- << " on node " << node.NodeId);
+ while (auto node = nodes.PopNode()) {
+ auto resultNode = node->BuildResult();
+ if (resultNode) {
+ if (LogFunc) {
+ LogFunc(TStringBuilder() << "About to execute tasks [" << JoinSeq(", ", resultNode->TaskIds) << "]"
+ << " on node " << resultNode->NodeId);
+ }
+ result.emplace_back(std::move(*resultNode));
}
-
- result.push_back({});
- TResult& item = result.back();
- item.NodeId = node.NodeId;
- item.ResourceManager = node.ResourceManagerId;
- item.TaskIds.swap(node.Tasks);
}
return result;