diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-21 10:45:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-21 10:45:32 +0300 |
commit | f693cb52d2b30cfce40dd0f94cdf4bfa2db01b5e (patch) | |
tree | c18918695a7b56a2f4f1ac09440b30fdbc5ae584 | |
parent | c898c805447c595e47f23ce17af55d546b67ffb9 (diff) | |
download | ydb-f693cb52d2b30cfce40dd0f94cdf4bfa2db01b5e.tar.gz |
incapsulate tasks calculation logic for kqp
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 57 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_ut.cpp | 65 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_predictor.cpp | 174 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_predictor.h | 47 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 38 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_tasks.proto | 11 |
16 files changed, 347 insertions, 98 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 23f6cdfadea..bdedcd082af 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -172,7 +172,8 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { LOG_E("Not enough resources to execute query locally and no information about other nodes"); auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, - "Not enough resources to execute query locally and no information about other nodes (estimation: " + ToString(LocalRunMemoryEst) + ")"); + "Not enough resources to execute query locally and no information about other nodes (estimation: " + + ToString(LocalRunMemoryEst) + ";" + GetEstimationsInfo() + ")"); TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release())); return; @@ -237,6 +238,17 @@ void TKqpPlanner::ProcessTasksForScanExecuter() { } } +TString TKqpPlanner::GetEstimationsInfo() const { + TStringStream ss; + ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:"; + if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { + ss << it->second.size() << ";"; + } else { + ss << "0;"; + } + return ss.Str(); +} + void TKqpPlanner::PrepareToProcess() { auto rmConfig = GetKqpResourceManager()->GetConfig(); @@ -249,12 +261,12 @@ void TKqpPlanner::PrepareToProcess() { LocalRunMemoryEst = 0; for (size_t i = 0; i < ComputeTasks.size(); ++i) { - EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]); + EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i], ComputeTasks.size()); LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit; } if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) { for (size_t i = 0; i < it->second.size(); ++i) { - EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]); + EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()], it->second.size()); LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit; } } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 1fc08d0f266..025a0dcbffe 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -43,6 +43,7 @@ public: ui32 GetCurrentRetryDelay(ui32 requestId); private: void PrepareToProcess(); + TString GetEstimationsInfo() const; void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index f61622abbcf..db98b839d12 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -12,6 +12,7 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/query_compiler/kqp_predictor.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/runtime/kqp_transport.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> @@ -209,25 +210,24 @@ private: } }; - ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const { + ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const { ui32 result = 0; + const auto& stage = GetStage(stageInfo); if (isOlapScan) { if (AggregationSettings.HasCSScanMinimalThreads()) { result = AggregationSettings.GetCSScanMinimalThreads(); } else { - std::optional<ui32> userPoolSize; - if (TlsActivationContext && TlsActivationContext->ActorSystem()) { - userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount(AppData()->UserPoolId); - } - if (!userPoolSize) { - ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction"; - userPoolSize = NSystemInfo::NumberOfCpus(); + TStagePredictor predictor; + const ui32 threadsCount = TStagePredictor::GetUsableThreads(); + if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction"; + return threadsCount; + } else { + return std::max<ui32>(1, predictor.CalcTasksOptimalCount(threadsCount, {})); } - result = *userPoolSize; } } else { result = AggregationSettings.GetDSScanMinimalThreads(); - const auto& stage = GetStage(stageInfo); if (stage.GetProgram().GetSettings().GetHasSort()) { result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); } @@ -238,19 +238,19 @@ private: return Max<ui32>(1, result); } - ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount) const { - ui32 result = Max<ui32>(1, previousTasksCount / 2); - const auto& stage = GetStage(stageInfo); - if (stage.GetProgram().GetSettings().GetHasAggregation() && !stage.GetProgram().GetSettings().GetHasFilter()) { - result *= AggregationSettings.GetAggregationHardThreadsKff(); - } - if (stage.GetProgram().GetSettings().GetHasSort()) { - result *= AggregationSettings.GetAggregationSortThreadsKff(); - } - if (stage.GetProgram().GetSettings().GetHasMapJoin()) { - result *= AggregationSettings.GetAggregationJoinThreadsKff(); + ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const { + if (AggregationSettings.HasAggregationComputeThreads()) { + return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads()); + } else { + const auto& stage = GetStage(stageInfo); + TStagePredictor predictor; + if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction"; + return std::max<ui32>(1, previousTasksCount * 0.75); + } else { + return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount; + } } - return result; } TTask& AssignTaskToShard( @@ -268,8 +268,7 @@ private: auto& tasks = nodeTasks[nodeId]; auto& cnt = assignedShardsCount[nodeId]; - - const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan); + const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan, nodeId); if (cnt < maxScansPerNode) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; @@ -286,7 +285,6 @@ private: void BuildScanTasks(TStageInfo& stageInfo) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; - auto& stage = GetStage(stageInfo); const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); @@ -384,6 +382,8 @@ private: auto& stage = GetStage(stageInfo); ui32 partitionsCount = 1; + ui32 inputTasks = 0; + bool isShuffle = false; for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) { const auto& input = stage.GetInputs(inputIndex); @@ -411,7 +411,8 @@ private: switch (input.GetTypeCase()) { case NKqpProto::TKqpPhyConnection::kHashShuffle: { - partitionsCount = Max<ui32>(1, GetMaxTasksAggregation(stageInfo, originStageInfo.Tasks.size())); + inputTasks += originStageInfo.Tasks.size(); + isShuffle = true; break; } @@ -425,6 +426,10 @@ private: } } + if (isShuffle) { + partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, ShardsOnNode.size())); + } + for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = TasksGraph.AddTask(stageInfo); LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 0fd6b6a3a15..7c4bf8787fc 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -149,7 +149,7 @@ private: ui32 requestChannels = 0; for (auto& dqTask : *msg.MutableTasks()) { - auto estimation = EstimateTaskResources(dqTask, Config); + auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size()); LOG_D("Resource estimation complete" << ", TxId: " << txId << ", task id: " << dqTask.GetId() << ", node id: " << SelfId().NodeId() << ", estimated resources: " << estimation.ToString()); diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index 273c301fc9b..88a852aed7f 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -270,7 +270,11 @@ void KqpNode::CommonCase() { TActorId sender1 = Runtime->AllocateEdgeActor(); TActorId sender2 = Runtime->AllocateEdgeActor(); - const ui64 taskSize = 1'000 + 2 * 10; + const ui64 additionalSize = 2 * 10; + const ui64 fullMkqlLimit = 1'000; +// const ui64 taskSize = fullMkqlLimit + additionalSize; + const ui64 tasksSize12 = fullMkqlLimit + 2 * additionalSize; + const ui64 tasksSize22 = 2 * tasksSize12; //for 2 requests // first request SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2}); @@ -292,9 +296,9 @@ void KqpNode::CommonCase() { UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize12); - AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2); + AssertResourceBrokerSensors(0, tasksSize12, 0, 0, 2); } Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); @@ -319,7 +323,7 @@ void KqpNode::CommonCase() { UNIT_ASSERT_VALUES_EQUAL(98, payload.GetExecutionUnits()); UNIT_ASSERT_VALUES_EQUAL(1, payload.GetMemory().size()); UNIT_ASSERT_VALUES_EQUAL((ui32) NRm::EKqpMemoryPool::ScanQuery, payload.GetMemory()[0].GetPool()); - UNIT_ASSERT_VALUES_EQUAL(cfg.GetResourceManager().GetQueryMemoryLimit() - 2 * taskSize, + UNIT_ASSERT_VALUES_EQUAL(cfg.GetResourceManager().GetQueryMemoryLimit() - tasksSize12, payload.GetMemory()[0].GetAvailable()); } @@ -335,7 +339,7 @@ void KqpNode::CommonCase() { UNIT_ASSERT_EQUAL(NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR, notStartedTask.GetReason()); } - AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2); + AssertResourceBrokerSensors(0, tasksSize12, 0, 0, 2); } // second request @@ -355,9 +359,9 @@ void KqpNode::CommonCase() { UNIT_ASSERT(CompFactory->Task2Actor.contains(4)); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 4 * taskSize); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize22); - AssertResourceBrokerSensors(0, 4 * taskSize, 0, 0, 4); + AssertResourceBrokerSensors(0, tasksSize22, 0, 0, 4); } // request extra resources for taskId 4 @@ -369,8 +373,8 @@ void KqpNode::CommonCase() { UNIT_ASSERT(allocated); DispatchKqpNodePostponedEvents(sender1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 4 * taskSize + 100); - AssertResourceBrokerSensors(0, 4 * taskSize + 100, 0, 1, 4); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize22 + 100); + AssertResourceBrokerSensors(0, tasksSize22 + 100, 0, 1, 4); } // complete tasks @@ -383,9 +387,10 @@ void KqpNode::CommonCase() { SendFinishTask(mockCA.ActorId, taskId < 3 ? 1 : 2, taskId); { UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), (i64) CompFactory->Task2Actor.size()); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), (i64) (4 - taskId) * taskSize + extraMem); + const ui64 tasksMemMkql = fullMkqlLimit * (4 - taskId) / 2; + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), (i64) (4 - taskId) * additionalSize + extraMem + tasksMemMkql); - AssertResourceBrokerSensors(0, (4 - taskId) * taskSize + extraMem, 0, 1 + taskId, 4 - taskId); + AssertResourceBrokerSensors(0, (4 - taskId) * additionalSize + extraMem + tasksMemMkql, 0, 1 + taskId, 4 - taskId); } } @@ -399,7 +404,9 @@ void KqpNode::ExtraAllocation() { TActorId sender1 = Runtime->AllocateEdgeActor(); - const ui64 taskSize = 1'000 + 2 * 10; + const ui64 mkqlLimit = 1'000; + const ui64 additionalSize = 2 * 10; + const ui64 taskSize2 = mkqlLimit + additionalSize * 2; SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2}); Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1); @@ -415,8 +422,8 @@ void KqpNode::ExtraAllocation() { DispatchKqpNodePostponedEvents(sender1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize + 100); - AssertResourceBrokerSensors(0, 2 * taskSize + 100, 0, 1, 2); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2 + 100); + AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2); } // too big request @@ -428,10 +435,10 @@ void KqpNode::ExtraAllocation() { DispatchKqpNodePostponedEvents(sender1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize + 100); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2 + 100); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughMemory->Val(), 1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); - AssertResourceBrokerSensors(0, 2 * taskSize + 100, 0, 1, 2); + AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2); } } @@ -462,7 +469,9 @@ void KqpNode::NotEnoughMemory_Extra() { TActorId sender1 = Runtime->AllocateEdgeActor(); - const ui64 taskSize = 1'000 + 2 * 10; + const ui64 mkqlLimit = 1'000; + const ui64 additionalSize = 2 * 10; + const ui64 taskSize2 = mkqlLimit + additionalSize * 2; // first request SendStartTasksRequest(sender1, /* txId */ (ui64)1, /* taskIds */ {1, 2}); @@ -484,9 +493,9 @@ void KqpNode::NotEnoughMemory_Extra() { UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2); - AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2); + AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2); } { @@ -500,11 +509,11 @@ void KqpNode::NotEnoughMemory_Extra() { DispatchKqpNodePostponedEvents(sender1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2); - UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize); + UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughMemory->Val(), 1); UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0); - AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2); + AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2); } void KqpNode::NotEnoughComputeActors() { @@ -532,7 +541,7 @@ void KqpNode::NotEnoughComputeActors() { void KqpNode::ResourceBrokerNotEnoughResources() { auto cfg = MakeKqpResourceManagerConfig(); cfg.MutableResourceManager()->SetChannelBufferSize(6'000); - cfg.MutableResourceManager()->SetQueryMemoryLimit(100'000); + cfg.MutableResourceManager()->SetQueryMemoryLimit(49'000); CreateKqpNode(cfg); TActorId sender1 = Runtime->AllocateEdgeActor(); @@ -542,7 +551,7 @@ void KqpNode::ResourceBrokerNotEnoughResources() { { auto answer = Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1); Y_UNUSED(answer); - AssertResourceBrokerSensors(0, 26'000, 0, 0, 2); + AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2); } SendStartTasksRequest(sender2, 2, {3, 4}); @@ -557,13 +566,13 @@ void KqpNode::ResourceBrokerNotEnoughResources() { } } - AssertResourceBrokerSensors(0, 26'000, 0, 1, 2); + AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 1, 2); } void KqpNode::ResourceBrokerNotEnoughResources_Extra() { auto cfg = MakeKqpResourceManagerConfig(); cfg.MutableResourceManager()->SetChannelBufferSize(6'000); - cfg.MutableResourceManager()->SetQueryMemoryLimit(100'000); + cfg.MutableResourceManager()->SetQueryMemoryLimit(49'000); CreateKqpNode(cfg); TActorId sender1 = Runtime->AllocateEdgeActor(); @@ -572,18 +581,18 @@ void KqpNode::ResourceBrokerNotEnoughResources_Extra() { { auto answer = Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1); Y_UNUSED(answer); - AssertResourceBrokerSensors(0, 26'000, 0, 0, 2); + AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2); } { NKikimr::TActorSystemStub stub; auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn; - bool allocated = task1ExtraAlloc((ui64)1, 1, 26'000); + bool allocated = task1ExtraAlloc((ui64)1, 1, 2 * (6000 * 2 + 1000 / 2)); UNIT_ASSERT(!allocated); } - AssertResourceBrokerSensors(0, 26'000, 0, 0, 2); + AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2); } void KqpNode::ExecuterLost() { diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt index c848e589bdd..18165e775a3 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt @@ -23,4 +23,5 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt index bd6045c8f04..731299f3f56 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt @@ -24,4 +24,5 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt index bd6045c8f04..731299f3f56 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt @@ -24,4 +24,5 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.cpp b/ydb/core/kqp/query_compiler/kqp_predictor.cpp new file mode 100644 index 00000000000..293ad98e812 --- /dev/null +++ b/ydb/core/kqp/query_compiler/kqp_predictor.cpp @@ -0,0 +1,174 @@ +#include "kqp_predictor.h" +#include <ydb/core/kqp/opt/kqp_opt.h> +#include <ydb/core/base/appdata.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> +#include <util/system/info.h> + +namespace NKikimr::NKqp { + +void TStagePredictor::Prepare() { + InputDataPrediction = 1; + if (HasLookupFlag) { + InputDataPrediction = 0.5; + } else if (HasRangeScanFlag) { + InputDataPrediction = 1; + } else if (InputDataVolumes.size()) { + InputDataPrediction = 0; + } + + for (auto&& i : InputDataVolumes) { + InputDataPrediction += i; + } + + OutputDataPrediction = InputDataPrediction; + if (HasTopFlag) { + OutputDataPrediction = InputDataPrediction * 0.01; + } else if (HasStateCombinerFlag || HasFinalCombinerFlag) { + if (GroupByKeysCount) { + OutputDataPrediction = InputDataPrediction; + } else { + OutputDataPrediction = InputDataPrediction * 0.01; + } + } +} + +void TStagePredictor::Scan(const NYql::TExprNode::TPtr& stageNode) { + NYql::VisitExpr(stageNode, [&](const NYql::TExprNode::TPtr& exprNode) { + NYql::NNodes::TExprBase node(exprNode); + if (node.Maybe<NYql::NNodes::TKqpWideReadTable>()) { + HasRangeScanFlag = true; + } else if (node.Maybe<NYql::NNodes::TKqpLookupTable>()) { + HasLookupFlag = true; + } else if (node.Maybe<NYql::NNodes::TKqpUpsertRows>()) { + } else if (node.Maybe<NYql::NNodes::TKqpDeleteRows>()) { + + } else if (node.Maybe<NYql::NNodes::TKqpWideReadTableRanges>()) { + HasRangeScanFlag = true; + } else if (node.Maybe<NYql::NNodes::TKqpWideReadOlapTableRanges>()) { + HasRangeScanFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoSort>()) { + HasSortFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoKeepTop>() || node.Maybe<NYql::NNodes::TCoTop>() || node.Maybe<NYql::NNodes::TCoWideTop>()) { + HasTopFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoTopSort>() || node.Maybe<NYql::NNodes::TCoWideTopSort>()) { + HasTopFlag = true; + HasSortFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoFilterBase>()) { + HasFilterFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoWideCombiner>()) { + auto wCombiner = node.Cast<NYql::NNodes::TCoWideCombiner>(); + GroupByKeysCount = wCombiner.KeyExtractor().Ptr()->ChildrenSize() - 1; + if (wCombiner.MemLimit() != "") { + HasFinalCombinerFlag = true; + } else { + HasStateCombinerFlag = true; + } + } else if (node.Maybe<NYql::NNodes::TCoMapJoinCore>()) { + HasMapJoinFlag = true; + } else if (node.Maybe<NYql::NNodes::TCoUdf>()) { + HasUdfFlag = true; + } + return true; + }); +} + +void TStagePredictor::AcceptInputStageInfo(const TStagePredictor& info, const NYql::NNodes::TDqConnection& /*connection*/) { + StageLevel = Max<ui32>(StageLevel, info.StageLevel + 1); + InputDataVolumes.emplace_back(info.GetOutputDataPrediction()); +} + +void TStagePredictor::SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings& kqpProto) const { + kqpProto.SetHasMapJoin(HasMapJoinFlag); + kqpProto.SetHasSort(HasSortFlag); + kqpProto.SetHasUdf(HasUdfFlag); + kqpProto.SetHasFinalAggregation(HasFinalCombinerFlag); + kqpProto.SetHasStateAggregation(HasStateCombinerFlag); + kqpProto.SetGroupByKeysCount(GroupByKeysCount); + kqpProto.SetHasFilter(HasFilterFlag); + kqpProto.SetHasTop(HasTopFlag); + kqpProto.SetHasRangeScan(HasRangeScanFlag); + kqpProto.SetHasLookup(HasLookupFlag); + kqpProto.SetInputDataPrediction(InputDataPrediction); + kqpProto.SetOutputDataPrediction(OutputDataPrediction); + kqpProto.SetStageLevel(StageLevel); + kqpProto.SetLevelDataPrediction(LevelDataPrediction.value_or(1)); +} + +bool TStagePredictor::DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto) { + HasMapJoinFlag = kqpProto.GetHasMapJoin(); + HasSortFlag = kqpProto.GetHasSort(); + HasUdfFlag = kqpProto.GetHasUdf(); + HasFinalCombinerFlag = kqpProto.GetHasFinalAggregation(); + HasStateCombinerFlag = kqpProto.GetHasStateAggregation(); + GroupByKeysCount = kqpProto.GetGroupByKeysCount(); + HasFilterFlag = kqpProto.GetHasFilter(); + HasTopFlag = kqpProto.GetHasTop(); + HasRangeScanFlag = kqpProto.GetHasRangeScan(); + HasLookupFlag = kqpProto.GetHasLookup(); + InputDataPrediction = kqpProto.GetInputDataPrediction(); + OutputDataPrediction = kqpProto.GetOutputDataPrediction(); + StageLevel = kqpProto.GetStageLevel(); + LevelDataPrediction = kqpProto.GetLevelDataPrediction(); + return true; +} + +ui32 TStagePredictor::GetUsableThreads() { + std::optional<ui32> userPoolSize; + if (TlsActivationContext && TlsActivationContext->ActorSystem()) { + userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount(AppData()->UserPoolId); + } + if (!userPoolSize) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction"; + userPoolSize = NSystemInfo::NumberOfCpus(); + } + return Max<ui32>(1, *userPoolSize); +} + +ui32 TStagePredictor::CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const { + ui32 result = 0; + if (!LevelDataPrediction || *LevelDataPrediction == 0) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficult not defined for correct calculation"; + result = availableThreadsCount; + } else { + result = (availableThreadsCount - previousStageTasksCount.value_or(0) * 0.25) * (InputDataPrediction / *LevelDataPrediction); + } + if (previousStageTasksCount) { + result = std::min<ui32>(result, *previousStageTasksCount); + } + return std::max<ui32>(1, result); +} + +TStagePredictor& TRequestPredictor::BuildForStage(const NYql::NNodes::TDqPhyStage& stage, NYql::TExprContext& ctx) { + StagePredictors.emplace_back(); + TStagePredictor& result = StagePredictors.back(); + StagesMap.emplace(stage.Ref().UniqueId(), &result); + result.Scan(stage.Program().Ptr()); + + for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { + const auto& input = stage.Inputs().Item(inputIndex); + + if (input.Maybe<NYql::NNodes::TDqSource>()) { + } else { + YQL_ENSURE(input.Maybe<NYql::NNodes::TDqConnection>()); + auto connection = input.Cast<NYql::NNodes::TDqConnection>(); + auto it = StagesMap.find(connection.Output().Stage().Ref().UniqueId()); + YQL_ENSURE(it != StagesMap.end(), "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map for prediction: " + << PrintKqpStageOnly(connection.Output().Stage(), ctx)); + result.AcceptInputStageInfo(*it->second, connection); + } + } + result.Prepare(); + return result; +} + +double TRequestPredictor::GetLevelDataVolume(const ui32 level) const { + double result = 0; + for (auto&& i : StagePredictors) { + if (i.GetStageLevel() == level) { + result += i.GetInputDataPrediction(); + } + } + return result; +} + +} diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.h b/ydb/core/kqp/query_compiler/kqp_predictor.h new file mode 100644 index 00000000000..8195583876f --- /dev/null +++ b/ydb/core/kqp/query_compiler/kqp_predictor.h @@ -0,0 +1,47 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> + +namespace NKikimr::NKqp { +class TStagePredictor { +private: + YDB_READONLY_FLAG(HasFinalCombiner, false); + YDB_READONLY_FLAG(HasStateCombiner, false); + YDB_READONLY(ui32, GroupByKeysCount, 0); + + YDB_READONLY_FLAG(HasSort, false); + YDB_READONLY_FLAG(HasMapJoin, false); + YDB_READONLY_FLAG(HasUdf, false); + YDB_READONLY_FLAG(HasFilter, false); + YDB_READONLY_FLAG(HasTop, false); + YDB_READONLY_FLAG(HasRangeScan, false); + YDB_READONLY_FLAG(HasLookup, false); + + YDB_READONLY(double, InputDataPrediction, 1); + YDB_READONLY(double, OutputDataPrediction, 1); + YDB_OPT(double, LevelDataPrediction); + YDB_READONLY(ui32, StageLevel, 0); + std::vector<double> InputDataVolumes; + void Prepare(); + friend class TRequestPredictor; +public: + void Scan(const NYql::TExprNode::TPtr& stageNode); + void AcceptInputStageInfo(const TStagePredictor& info, const NYql::NNodes::TDqConnection& connection); + void SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings& kqpProto) const; + bool DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto); + static ui32 GetUsableThreads(); + + ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const; +}; + +class TRequestPredictor { +private: + std::deque<TStagePredictor> StagePredictors; + std::map<ui64, TStagePredictor*> StagesMap; +public: + double GetLevelDataVolume(const ui32 level) const; + TStagePredictor& BuildForStage(const NYql::NNodes::TDqPhyStage& stage, NYql::TExprContext& ctx); +}; + +} diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 4a25ca0abe9..db64a5d2325 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1,4 +1,5 @@ #include "kqp_query_compiler.h" +#include "kqp_predictor.h" #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/query_compiler/kqp_mkql_compiler.h> @@ -523,7 +524,7 @@ public: [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() < second->GetName(); }); - inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), + inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() == second->GetName(); }), @@ -543,10 +544,13 @@ private: } void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx, - const TMap<ui64, ui32>& stagesMap, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) + const TMap<ui64, ui32>& stagesMap, TRequestPredictor& rPredictor, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) { stageProto.SetIsEffectsStage(NOpt::IsKqpEffectsStage(stage)); + TStagePredictor& stagePredictor = rPredictor.BuildForStage(stage, ctx); + stagePredictor.Scan(stage.Program().Ptr()); + for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { const auto& input = stage.Inputs().Item(inputIndex); @@ -564,11 +568,6 @@ private: } } - bool hasSort = false; - bool hasMapJoin = false; - bool hasUdf = false; - bool hasFilter = false; - bool hasWideCombiner = false; VisitExpr(stage.Program().Ptr(), [&](const TExprNode::TPtr& exprNode) { TExprBase node(exprNode); if (auto maybeReadTable = node.Maybe<TKqpWideReadTable>()) { @@ -652,20 +651,9 @@ private: FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange()); FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange()); tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS); - } else if (node.Maybe<TCoSort>()) { - hasSort = true; - } else if (node.Maybe<TCoFilterBase>()) { - hasFilter = true; - } else if (node.Maybe<TCoWideCombiner>()) { - hasWideCombiner = true; - } else if (node.Maybe<TCoMapJoinCore>()) { - hasMapJoin = true; - } else if (node.Maybe<TCoUdf>()) { - hasUdf = true; } else { YQL_ENSURE(!node.Maybe<TKqpReadTable>()); } - return true; }); @@ -695,11 +683,8 @@ private: auto& programProto = *stageProto.MutableProgram(); programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); programProto.SetRaw(programBytecode); - programProto.MutableSettings()->SetHasMapJoin(hasMapJoin); - programProto.MutableSettings()->SetHasSort(hasSort); - programProto.MutableSettings()->SetHasUdf(hasUdf); - programProto.MutableSettings()->SetHasAggregation(hasWideCombiner); - programProto.MutableSettings()->SetHasFilter(hasFilter); + + stagePredictor.SerializeToKqpSettings(*programProto.MutableSettings()); for (auto member : paramsType->GetItems()) { auto paramName = TString(member->GetName()); @@ -720,12 +705,17 @@ private: TMap<ui64, ui32> stagesMap; THashMap<TStringBuf, THashSet<TStringBuf>> tablesMap; + TRequestPredictor rPredictor; for (const auto& stage : tx.Stages()) { auto* protoStage = txProto.AddStages(); - CompileStage(stage, *protoStage, ctx, stagesMap, tablesMap); + CompileStage(stage, *protoStage, ctx, stagesMap, rPredictor, tablesMap); hasEffectStage |= protoStage->GetIsEffectsStage(); stagesMap[stage.Ref().UniqueId()] = txProto.StagesSize() - 1; } + for (auto&& i : *txProto.MutableStages()) { + i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel())); + } + YQL_ENSURE(hasEffectStage == txSettings.WithEffects); diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp index cd96733a969..ff279362f0f 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp @@ -6,15 +6,15 @@ using namespace NYql::NDqProto; using namespace NKikimrConfig; TTaskResourceEstimation EstimateTaskResources(const TDqTask& task, - const TTableServiceConfig::TResourceManager& config) + const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) { TTaskResourceEstimation ret; - EstimateTaskResources(task, config, ret); + EstimateTaskResources(task, config, ret, tasksCount); return ret; } void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config, - TTaskResourceEstimation& ret) + TTaskResourceEstimation& ret, const ui32 tasksCount) { ret.TaskId = task.GetId(); ret.ChannelBuffersCount += task.GetInputs().size() ? 1 : 0; @@ -30,9 +30,9 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso const auto& opts = task.GetProgram().GetSettings(); if (/* opts.GetHasSort() || */opts.GetHasMapJoin()) { - ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit(); + ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount; } else { - ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit(); + ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount; } ret.TotalMemoryLimit = ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit @@ -40,12 +40,12 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso } TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, - const TTableServiceConfig::TResourceManager& config) + const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount) { TVector<TTaskResourceEstimation> ret; ret.resize(tasks.size()); for (ui64 i = 0; i < tasks.size(); ++i) { - EstimateTaskResources(tasks[i], config, ret[i]); + EstimateTaskResources(tasks[i], config, ret[i], tasksCount); } return ret; } diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h index 2cb67518383..9abbf8dcc78 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h @@ -27,12 +27,12 @@ struct TTaskResourceEstimation { }; TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config); + const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); void EstimateTaskResources(const NYql::NDqProto::TDqTask& task, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result); + const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount); TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks, - const NKikimrConfig::TTableServiceConfig::TResourceManager& config); + const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp index 64bb10bae36..b741dc2220c 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp @@ -31,7 +31,7 @@ Y_UNIT_TEST(TestChannelSize) { auto* output = task.MutableOutputs()->Add(); output->MutableChannels()->Add(); - auto est = EstimateTaskResources(task, config); + auto est = EstimateTaskResources(task, config, 1); UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount); UNIT_ASSERT_EQUAL(est.ChannelBufferMemoryLimit, config.GetChannelBufferSize()); @@ -41,7 +41,7 @@ Y_UNIT_TEST(TestChannelSize) { input->MutableChannels()->Add(); } - est = EstimateTaskResources(task, config); + est = EstimateTaskResources(task, config, 1); UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount); UNIT_ASSERT(est.ChannelBufferMemoryLimit == config.GetChannelBufferSize()); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2cab7a8d621..05bd98b896b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1200,9 +1200,8 @@ message TTableServiceConfig { optional uint32 DSBaseSortScanThreads = 32 [default = 16]; optional uint32 DSBaseJoinScanThreads = 33 [default = 16]; - optional double AggregationHardThreadsKff = 34 [default = 2]; - optional double AggregationSortThreadsKff = 35 [default = 1]; - optional double AggregationJoinThreadsKff = 36 [default = 1]; + optional uint32 AggregationComputeThreads = 37; + } message TExecuterRetriesConfig { diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 708189c4b20..b812255752f 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -33,8 +33,17 @@ message TProgram { bool HasMapJoin = 1; bool HasSort = 2; bool HasUdf = 3; - bool HasAggregation = 4; + bool HasStateAggregation = 4; bool HasFilter = 5; + uint32 StageLevel = 6; + double LevelDataPrediction = 7; + bool HasFinalAggregation = 8; + bool HasTop = 9; + uint32 GroupByKeysCount = 10; + bool HasRangeScan = 11; + bool HasLookup = 12; + double InputDataPrediction = 13; + double OutputDataPrediction = 14; } uint32 RuntimeVersion = 1; |