aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-02-21 10:45:32 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-02-21 10:45:32 +0300
commitf693cb52d2b30cfce40dd0f94cdf4bfa2db01b5e (patch)
treec18918695a7b56a2f4f1ac09440b30fdbc5ae584
parentc898c805447c595e47f23ce17af55d546b67ffb9 (diff)
downloadydb-f693cb52d2b30cfce40dd0f94cdf4bfa2db01b5e.tar.gz
incapsulate tasks calculation logic for kqp
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp18
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp57
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp2
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp65
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/query_compiler/kqp_predictor.cpp174
-rw-r--r--ydb/core/kqp/query_compiler/kqp_predictor.h47
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp38
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_estimation.cpp14
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_estimation.h6
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp4
-rw-r--r--ydb/core/protos/config.proto5
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto11
16 files changed, 347 insertions, 98 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 23f6cdfadea..bdedcd082af 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -172,7 +172,8 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
LOG_E("Not enough resources to execute query locally and no information about other nodes");
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
- "Not enough resources to execute query locally and no information about other nodes (estimation: " + ToString(LocalRunMemoryEst) + ")");
+ "Not enough resources to execute query locally and no information about other nodes (estimation: "
+ + ToString(LocalRunMemoryEst) + ";" + GetEstimationsInfo() + ")");
TlsActivationContext->Send(std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()));
return;
@@ -237,6 +238,17 @@ void TKqpPlanner::ProcessTasksForScanExecuter() {
}
}
+TString TKqpPlanner::GetEstimationsInfo() const {
+ TStringStream ss;
+ ss << "ComputeTasks:" << ComputeTasks.size() << ";NodeTasks:";
+ if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) {
+ ss << it->second.size() << ";";
+ } else {
+ ss << "0;";
+ }
+ return ss.Str();
+}
+
void TKqpPlanner::PrepareToProcess() {
auto rmConfig = GetKqpResourceManager()->GetConfig();
@@ -249,12 +261,12 @@ void TKqpPlanner::PrepareToProcess() {
LocalRunMemoryEst = 0;
for (size_t i = 0; i < ComputeTasks.size(); ++i) {
- EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i]);
+ EstimateTaskResources(ComputeTasks[i], rmConfig, ResourceEstimations[i], ComputeTasks.size());
LocalRunMemoryEst += ResourceEstimations[i].TotalMemoryLimit;
}
if (auto it = MainTasksPerNode.find(ExecuterId.NodeId()); it != MainTasksPerNode.end()) {
for (size_t i = 0; i < it->second.size(); ++i) {
- EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()]);
+ EstimateTaskResources(it->second[i], rmConfig, ResourceEstimations[i + ComputeTasks.size()], it->second.size());
LocalRunMemoryEst += ResourceEstimations[i + ComputeTasks.size()].TotalMemoryLimit;
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 1fc08d0f266..025a0dcbffe 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -43,6 +43,7 @@ public:
ui32 GetCurrentRetryDelay(ui32 requestId);
private:
void PrepareToProcess();
+ TString GetEstimationsInfo() const;
void RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index f61622abbcf..db98b839d12 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -12,6 +12,7 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/query_compiler/kqp_predictor.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
@@ -209,25 +210,24 @@ private:
}
};
- ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const {
+ ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const {
ui32 result = 0;
+ const auto& stage = GetStage(stageInfo);
if (isOlapScan) {
if (AggregationSettings.HasCSScanMinimalThreads()) {
result = AggregationSettings.GetCSScanMinimalThreads();
} else {
- std::optional<ui32> userPoolSize;
- if (TlsActivationContext && TlsActivationContext->ActorSystem()) {
- userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount(AppData()->UserPoolId);
- }
- if (!userPoolSize) {
- ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction";
- userPoolSize = NSystemInfo::NumberOfCpus();
+ TStagePredictor predictor;
+ const ui32 threadsCount = TStagePredictor::GetUsableThreads();
+ if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction";
+ return threadsCount;
+ } else {
+ return std::max<ui32>(1, predictor.CalcTasksOptimalCount(threadsCount, {}));
}
- result = *userPoolSize;
}
} else {
result = AggregationSettings.GetDSScanMinimalThreads();
- const auto& stage = GetStage(stageInfo);
if (stage.GetProgram().GetSettings().GetHasSort()) {
result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
}
@@ -238,19 +238,19 @@ private:
return Max<ui32>(1, result);
}
- ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount) const {
- ui32 result = Max<ui32>(1, previousTasksCount / 2);
- const auto& stage = GetStage(stageInfo);
- if (stage.GetProgram().GetSettings().GetHasAggregation() && !stage.GetProgram().GetSettings().GetHasFilter()) {
- result *= AggregationSettings.GetAggregationHardThreadsKff();
- }
- if (stage.GetProgram().GetSettings().GetHasSort()) {
- result *= AggregationSettings.GetAggregationSortThreadsKff();
- }
- if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
- result *= AggregationSettings.GetAggregationJoinThreadsKff();
+ ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount, const ui32 nodesCount) const {
+ if (AggregationSettings.HasAggregationComputeThreads()) {
+ return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads());
+ } else {
+ const auto& stage = GetStage(stageInfo);
+ TStagePredictor predictor;
+ if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction";
+ return std::max<ui32>(1, previousTasksCount * 0.75);
+ } else {
+ return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount;
+ }
}
- return result;
}
TTask& AssignTaskToShard(
@@ -268,8 +268,7 @@ private:
auto& tasks = nodeTasks[nodeId];
auto& cnt = assignedShardsCount[nodeId];
-
- const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan);
+ const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan, nodeId);
if (cnt < maxScansPerNode) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
@@ -286,7 +285,6 @@ private:
void BuildScanTasks(TStageInfo& stageInfo) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
-
auto& stage = GetStage(stageInfo);
const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
@@ -384,6 +382,8 @@ private:
auto& stage = GetStage(stageInfo);
ui32 partitionsCount = 1;
+ ui32 inputTasks = 0;
+ bool isShuffle = false;
for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) {
const auto& input = stage.GetInputs(inputIndex);
@@ -411,7 +411,8 @@ private:
switch (input.GetTypeCase()) {
case NKqpProto::TKqpPhyConnection::kHashShuffle: {
- partitionsCount = Max<ui32>(1, GetMaxTasksAggregation(stageInfo, originStageInfo.Tasks.size()));
+ inputTasks += originStageInfo.Tasks.size();
+ isShuffle = true;
break;
}
@@ -425,6 +426,10 @@ private:
}
}
+ if (isShuffle) {
+ partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, ShardsOnNode.size()));
+ }
+
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = TasksGraph.AddTask(stageInfo);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 0fd6b6a3a15..7c4bf8787fc 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -149,7 +149,7 @@ private:
ui32 requestChannels = 0;
for (auto& dqTask : *msg.MutableTasks()) {
- auto estimation = EstimateTaskResources(dqTask, Config);
+ auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size());
LOG_D("Resource estimation complete"
<< ", TxId: " << txId << ", task id: " << dqTask.GetId() << ", node id: " << SelfId().NodeId()
<< ", estimated resources: " << estimation.ToString());
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index 273c301fc9b..88a852aed7f 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -270,7 +270,11 @@ void KqpNode::CommonCase() {
TActorId sender1 = Runtime->AllocateEdgeActor();
TActorId sender2 = Runtime->AllocateEdgeActor();
- const ui64 taskSize = 1'000 + 2 * 10;
+ const ui64 additionalSize = 2 * 10;
+ const ui64 fullMkqlLimit = 1'000;
+// const ui64 taskSize = fullMkqlLimit + additionalSize;
+ const ui64 tasksSize12 = fullMkqlLimit + 2 * additionalSize;
+ const ui64 tasksSize22 = 2 * tasksSize12; //for 2 requests
// first request
SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2});
@@ -292,9 +296,9 @@ void KqpNode::CommonCase() {
UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize12);
- AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2);
+ AssertResourceBrokerSensors(0, tasksSize12, 0, 0, 2);
}
Runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
@@ -319,7 +323,7 @@ void KqpNode::CommonCase() {
UNIT_ASSERT_VALUES_EQUAL(98, payload.GetExecutionUnits());
UNIT_ASSERT_VALUES_EQUAL(1, payload.GetMemory().size());
UNIT_ASSERT_VALUES_EQUAL((ui32) NRm::EKqpMemoryPool::ScanQuery, payload.GetMemory()[0].GetPool());
- UNIT_ASSERT_VALUES_EQUAL(cfg.GetResourceManager().GetQueryMemoryLimit() - 2 * taskSize,
+ UNIT_ASSERT_VALUES_EQUAL(cfg.GetResourceManager().GetQueryMemoryLimit() - tasksSize12,
payload.GetMemory()[0].GetAvailable());
}
@@ -335,7 +339,7 @@ void KqpNode::CommonCase() {
UNIT_ASSERT_EQUAL(NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR, notStartedTask.GetReason());
}
- AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2);
+ AssertResourceBrokerSensors(0, tasksSize12, 0, 0, 2);
}
// second request
@@ -355,9 +359,9 @@ void KqpNode::CommonCase() {
UNIT_ASSERT(CompFactory->Task2Actor.contains(4));
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 4 * taskSize);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize22);
- AssertResourceBrokerSensors(0, 4 * taskSize, 0, 0, 4);
+ AssertResourceBrokerSensors(0, tasksSize22, 0, 0, 4);
}
// request extra resources for taskId 4
@@ -369,8 +373,8 @@ void KqpNode::CommonCase() {
UNIT_ASSERT(allocated);
DispatchKqpNodePostponedEvents(sender1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 4);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 4 * taskSize + 100);
- AssertResourceBrokerSensors(0, 4 * taskSize + 100, 0, 1, 4);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), tasksSize22 + 100);
+ AssertResourceBrokerSensors(0, tasksSize22 + 100, 0, 1, 4);
}
// complete tasks
@@ -383,9 +387,10 @@ void KqpNode::CommonCase() {
SendFinishTask(mockCA.ActorId, taskId < 3 ? 1 : 2, taskId);
{
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), (i64) CompFactory->Task2Actor.size());
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), (i64) (4 - taskId) * taskSize + extraMem);
+ const ui64 tasksMemMkql = fullMkqlLimit * (4 - taskId) / 2;
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), (i64) (4 - taskId) * additionalSize + extraMem + tasksMemMkql);
- AssertResourceBrokerSensors(0, (4 - taskId) * taskSize + extraMem, 0, 1 + taskId, 4 - taskId);
+ AssertResourceBrokerSensors(0, (4 - taskId) * additionalSize + extraMem + tasksMemMkql, 0, 1 + taskId, 4 - taskId);
}
}
@@ -399,7 +404,9 @@ void KqpNode::ExtraAllocation() {
TActorId sender1 = Runtime->AllocateEdgeActor();
- const ui64 taskSize = 1'000 + 2 * 10;
+ const ui64 mkqlLimit = 1'000;
+ const ui64 additionalSize = 2 * 10;
+ const ui64 taskSize2 = mkqlLimit + additionalSize * 2;
SendStartTasksRequest(sender1, /* txId */ 1, /* taskIds */ {1, 2});
Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1);
@@ -415,8 +422,8 @@ void KqpNode::ExtraAllocation() {
DispatchKqpNodePostponedEvents(sender1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize + 100);
- AssertResourceBrokerSensors(0, 2 * taskSize + 100, 0, 1, 2);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2 + 100);
+ AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2);
}
// too big request
@@ -428,10 +435,10 @@ void KqpNode::ExtraAllocation() {
DispatchKqpNodePostponedEvents(sender1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize + 100);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2 + 100);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughMemory->Val(), 1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
- AssertResourceBrokerSensors(0, 2 * taskSize + 100, 0, 1, 2);
+ AssertResourceBrokerSensors(0, taskSize2 + 100, 0, 1, 2);
}
}
@@ -462,7 +469,9 @@ void KqpNode::NotEnoughMemory_Extra() {
TActorId sender1 = Runtime->AllocateEdgeActor();
- const ui64 taskSize = 1'000 + 2 * 10;
+ const ui64 mkqlLimit = 1'000;
+ const ui64 additionalSize = 2 * 10;
+ const ui64 taskSize2 = mkqlLimit + additionalSize * 2;
// first request
SendStartTasksRequest(sender1, /* txId */ (ui64)1, /* taskIds */ {1, 2});
@@ -484,9 +493,9 @@ void KqpNode::NotEnoughMemory_Extra() {
UNIT_ASSERT_VALUES_EQUAL(10'000, memoryLimits.MkqlHeavyProgramMemoryLimit);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2);
- AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2);
+ AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2);
}
{
@@ -500,11 +509,11 @@ void KqpNode::NotEnoughMemory_Extra() {
DispatchKqpNodePostponedEvents(sender1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 2);
- UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 2 * taskSize);
+ UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), taskSize2);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughMemory->Val(), 1);
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughComputeActors->Val(), 0);
- AssertResourceBrokerSensors(0, 2 * taskSize, 0, 0, 2);
+ AssertResourceBrokerSensors(0, taskSize2, 0, 0, 2);
}
void KqpNode::NotEnoughComputeActors() {
@@ -532,7 +541,7 @@ void KqpNode::NotEnoughComputeActors() {
void KqpNode::ResourceBrokerNotEnoughResources() {
auto cfg = MakeKqpResourceManagerConfig();
cfg.MutableResourceManager()->SetChannelBufferSize(6'000);
- cfg.MutableResourceManager()->SetQueryMemoryLimit(100'000);
+ cfg.MutableResourceManager()->SetQueryMemoryLimit(49'000);
CreateKqpNode(cfg);
TActorId sender1 = Runtime->AllocateEdgeActor();
@@ -542,7 +551,7 @@ void KqpNode::ResourceBrokerNotEnoughResources() {
{
auto answer = Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1);
Y_UNUSED(answer);
- AssertResourceBrokerSensors(0, 26'000, 0, 0, 2);
+ AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2);
}
SendStartTasksRequest(sender2, 2, {3, 4});
@@ -557,13 +566,13 @@ void KqpNode::ResourceBrokerNotEnoughResources() {
}
}
- AssertResourceBrokerSensors(0, 26'000, 0, 1, 2);
+ AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 1, 2);
}
void KqpNode::ResourceBrokerNotEnoughResources_Extra() {
auto cfg = MakeKqpResourceManagerConfig();
cfg.MutableResourceManager()->SetChannelBufferSize(6'000);
- cfg.MutableResourceManager()->SetQueryMemoryLimit(100'000);
+ cfg.MutableResourceManager()->SetQueryMemoryLimit(49'000);
CreateKqpNode(cfg);
TActorId sender1 = Runtime->AllocateEdgeActor();
@@ -572,18 +581,18 @@ void KqpNode::ResourceBrokerNotEnoughResources_Extra() {
{
auto answer = Runtime->GrabEdgeEvent<TEvKqpNode::TEvStartKqpTasksResponse>(sender1);
Y_UNUSED(answer);
- AssertResourceBrokerSensors(0, 26'000, 0, 0, 2);
+ AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2);
}
{
NKikimr::TActorSystemStub stub;
auto& task1ExtraAlloc = CompFactory->Task2Actor[1].MemoryLimits.AllocateMemoryFn;
- bool allocated = task1ExtraAlloc((ui64)1, 1, 26'000);
+ bool allocated = task1ExtraAlloc((ui64)1, 1, 2 * (6000 * 2 + 1000 / 2));
UNIT_ASSERT(!allocated);
}
- AssertResourceBrokerSensors(0, 26'000, 0, 0, 2);
+ AssertResourceBrokerSensors(0, 2 * (6000 * 2 + 1000 / 2), 0, 0, 2);
}
void KqpNode::ExecuterLost() {
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
index c848e589bdd..18165e775a3 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
@@ -23,4 +23,5 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
index bd6045c8f04..731299f3f56 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
@@ -24,4 +24,5 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
index bd6045c8f04..731299f3f56 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
@@ -24,4 +24,5 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.cpp b/ydb/core/kqp/query_compiler/kqp_predictor.cpp
new file mode 100644
index 00000000000..293ad98e812
--- /dev/null
+++ b/ydb/core/kqp/query_compiler/kqp_predictor.cpp
@@ -0,0 +1,174 @@
+#include "kqp_predictor.h"
+#include <ydb/core/kqp/opt/kqp_opt.h>
+#include <ydb/core/base/appdata.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <util/system/info.h>
+
+namespace NKikimr::NKqp {
+
+void TStagePredictor::Prepare() {
+ InputDataPrediction = 1;
+ if (HasLookupFlag) {
+ InputDataPrediction = 0.5;
+ } else if (HasRangeScanFlag) {
+ InputDataPrediction = 1;
+ } else if (InputDataVolumes.size()) {
+ InputDataPrediction = 0;
+ }
+
+ for (auto&& i : InputDataVolumes) {
+ InputDataPrediction += i;
+ }
+
+ OutputDataPrediction = InputDataPrediction;
+ if (HasTopFlag) {
+ OutputDataPrediction = InputDataPrediction * 0.01;
+ } else if (HasStateCombinerFlag || HasFinalCombinerFlag) {
+ if (GroupByKeysCount) {
+ OutputDataPrediction = InputDataPrediction;
+ } else {
+ OutputDataPrediction = InputDataPrediction * 0.01;
+ }
+ }
+}
+
+void TStagePredictor::Scan(const NYql::TExprNode::TPtr& stageNode) {
+ NYql::VisitExpr(stageNode, [&](const NYql::TExprNode::TPtr& exprNode) {
+ NYql::NNodes::TExprBase node(exprNode);
+ if (node.Maybe<NYql::NNodes::TKqpWideReadTable>()) {
+ HasRangeScanFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TKqpLookupTable>()) {
+ HasLookupFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TKqpUpsertRows>()) {
+ } else if (node.Maybe<NYql::NNodes::TKqpDeleteRows>()) {
+
+ } else if (node.Maybe<NYql::NNodes::TKqpWideReadTableRanges>()) {
+ HasRangeScanFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TKqpWideReadOlapTableRanges>()) {
+ HasRangeScanFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoSort>()) {
+ HasSortFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoKeepTop>() || node.Maybe<NYql::NNodes::TCoTop>() || node.Maybe<NYql::NNodes::TCoWideTop>()) {
+ HasTopFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoTopSort>() || node.Maybe<NYql::NNodes::TCoWideTopSort>()) {
+ HasTopFlag = true;
+ HasSortFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoFilterBase>()) {
+ HasFilterFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoWideCombiner>()) {
+ auto wCombiner = node.Cast<NYql::NNodes::TCoWideCombiner>();
+ GroupByKeysCount = wCombiner.KeyExtractor().Ptr()->ChildrenSize() - 1;
+ if (wCombiner.MemLimit() != "") {
+ HasFinalCombinerFlag = true;
+ } else {
+ HasStateCombinerFlag = true;
+ }
+ } else if (node.Maybe<NYql::NNodes::TCoMapJoinCore>()) {
+ HasMapJoinFlag = true;
+ } else if (node.Maybe<NYql::NNodes::TCoUdf>()) {
+ HasUdfFlag = true;
+ }
+ return true;
+ });
+}
+
+void TStagePredictor::AcceptInputStageInfo(const TStagePredictor& info, const NYql::NNodes::TDqConnection& /*connection*/) {
+ StageLevel = Max<ui32>(StageLevel, info.StageLevel + 1);
+ InputDataVolumes.emplace_back(info.GetOutputDataPrediction());
+}
+
+void TStagePredictor::SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings& kqpProto) const {
+ kqpProto.SetHasMapJoin(HasMapJoinFlag);
+ kqpProto.SetHasSort(HasSortFlag);
+ kqpProto.SetHasUdf(HasUdfFlag);
+ kqpProto.SetHasFinalAggregation(HasFinalCombinerFlag);
+ kqpProto.SetHasStateAggregation(HasStateCombinerFlag);
+ kqpProto.SetGroupByKeysCount(GroupByKeysCount);
+ kqpProto.SetHasFilter(HasFilterFlag);
+ kqpProto.SetHasTop(HasTopFlag);
+ kqpProto.SetHasRangeScan(HasRangeScanFlag);
+ kqpProto.SetHasLookup(HasLookupFlag);
+ kqpProto.SetInputDataPrediction(InputDataPrediction);
+ kqpProto.SetOutputDataPrediction(OutputDataPrediction);
+ kqpProto.SetStageLevel(StageLevel);
+ kqpProto.SetLevelDataPrediction(LevelDataPrediction.value_or(1));
+}
+
+bool TStagePredictor::DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto) {
+ HasMapJoinFlag = kqpProto.GetHasMapJoin();
+ HasSortFlag = kqpProto.GetHasSort();
+ HasUdfFlag = kqpProto.GetHasUdf();
+ HasFinalCombinerFlag = kqpProto.GetHasFinalAggregation();
+ HasStateCombinerFlag = kqpProto.GetHasStateAggregation();
+ GroupByKeysCount = kqpProto.GetGroupByKeysCount();
+ HasFilterFlag = kqpProto.GetHasFilter();
+ HasTopFlag = kqpProto.GetHasTop();
+ HasRangeScanFlag = kqpProto.GetHasRangeScan();
+ HasLookupFlag = kqpProto.GetHasLookup();
+ InputDataPrediction = kqpProto.GetInputDataPrediction();
+ OutputDataPrediction = kqpProto.GetOutputDataPrediction();
+ StageLevel = kqpProto.GetStageLevel();
+ LevelDataPrediction = kqpProto.GetLevelDataPrediction();
+ return true;
+}
+
+ui32 TStagePredictor::GetUsableThreads() {
+ std::optional<ui32> userPoolSize;
+ if (TlsActivationContext && TlsActivationContext->ActorSystem()) {
+ userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount(AppData()->UserPoolId);
+ }
+ if (!userPoolSize) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction";
+ userPoolSize = NSystemInfo::NumberOfCpus();
+ }
+ return Max<ui32>(1, *userPoolSize);
+}
+
+ui32 TStagePredictor::CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const {
+ ui32 result = 0;
+ if (!LevelDataPrediction || *LevelDataPrediction == 0) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "level difficult not defined for correct calculation";
+ result = availableThreadsCount;
+ } else {
+ result = (availableThreadsCount - previousStageTasksCount.value_or(0) * 0.25) * (InputDataPrediction / *LevelDataPrediction);
+ }
+ if (previousStageTasksCount) {
+ result = std::min<ui32>(result, *previousStageTasksCount);
+ }
+ return std::max<ui32>(1, result);
+}
+
+TStagePredictor& TRequestPredictor::BuildForStage(const NYql::NNodes::TDqPhyStage& stage, NYql::TExprContext& ctx) {
+ StagePredictors.emplace_back();
+ TStagePredictor& result = StagePredictors.back();
+ StagesMap.emplace(stage.Ref().UniqueId(), &result);
+ result.Scan(stage.Program().Ptr());
+
+ for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
+ const auto& input = stage.Inputs().Item(inputIndex);
+
+ if (input.Maybe<NYql::NNodes::TDqSource>()) {
+ } else {
+ YQL_ENSURE(input.Maybe<NYql::NNodes::TDqConnection>());
+ auto connection = input.Cast<NYql::NNodes::TDqConnection>();
+ auto it = StagesMap.find(connection.Output().Stage().Ref().UniqueId());
+ YQL_ENSURE(it != StagesMap.end(), "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map for prediction: "
+ << PrintKqpStageOnly(connection.Output().Stage(), ctx));
+ result.AcceptInputStageInfo(*it->second, connection);
+ }
+ }
+ result.Prepare();
+ return result;
+}
+
+double TRequestPredictor::GetLevelDataVolume(const ui32 level) const {
+ double result = 0;
+ for (auto&& i : StagePredictors) {
+ if (i.GetStageLevel() == level) {
+ result += i.GetInputDataPrediction();
+ }
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.h b/ydb/core/kqp/query_compiler/kqp_predictor.h
new file mode 100644
index 00000000000..8195583876f
--- /dev/null
+++ b/ydb/core/kqp/query_compiler/kqp_predictor.h
@@ -0,0 +1,47 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+
+namespace NKikimr::NKqp {
+class TStagePredictor {
+private:
+ YDB_READONLY_FLAG(HasFinalCombiner, false);
+ YDB_READONLY_FLAG(HasStateCombiner, false);
+ YDB_READONLY(ui32, GroupByKeysCount, 0);
+
+ YDB_READONLY_FLAG(HasSort, false);
+ YDB_READONLY_FLAG(HasMapJoin, false);
+ YDB_READONLY_FLAG(HasUdf, false);
+ YDB_READONLY_FLAG(HasFilter, false);
+ YDB_READONLY_FLAG(HasTop, false);
+ YDB_READONLY_FLAG(HasRangeScan, false);
+ YDB_READONLY_FLAG(HasLookup, false);
+
+ YDB_READONLY(double, InputDataPrediction, 1);
+ YDB_READONLY(double, OutputDataPrediction, 1);
+ YDB_OPT(double, LevelDataPrediction);
+ YDB_READONLY(ui32, StageLevel, 0);
+ std::vector<double> InputDataVolumes;
+ void Prepare();
+ friend class TRequestPredictor;
+public:
+ void Scan(const NYql::TExprNode::TPtr& stageNode);
+ void AcceptInputStageInfo(const TStagePredictor& info, const NYql::NNodes::TDqConnection& connection);
+ void SerializeToKqpSettings(NYql::NDqProto::TProgram::TSettings& kqpProto) const;
+ bool DeserializeFromKqpSettings(const NYql::NDqProto::TProgram::TSettings& kqpProto);
+ static ui32 GetUsableThreads();
+
+ ui32 CalcTasksOptimalCount(const ui32 availableThreadsCount, const std::optional<ui32> previousStageTasksCount) const;
+};
+
+class TRequestPredictor {
+private:
+ std::deque<TStagePredictor> StagePredictors;
+ std::map<ui64, TStagePredictor*> StagesMap;
+public:
+ double GetLevelDataVolume(const ui32 level) const;
+ TStagePredictor& BuildForStage(const NYql::NNodes::TDqPhyStage& stage, NYql::TExprContext& ctx);
+};
+
+}
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 4a25ca0abe9..db64a5d2325 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -1,4 +1,5 @@
#include "kqp_query_compiler.h"
+#include "kqp_predictor.h"
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/query_compiler/kqp_mkql_compiler.h>
@@ -523,7 +524,7 @@ public:
[](const TItemExprType* first, const TItemExprType* second) {
return first->GetName() < second->GetName();
});
- inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(),
+ inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(),
[](const TItemExprType* first, const TItemExprType* second) {
return first->GetName() == second->GetName();
}),
@@ -543,10 +544,13 @@ private:
}
void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx,
- const TMap<ui64, ui32>& stagesMap, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
+ const TMap<ui64, ui32>& stagesMap, TRequestPredictor& rPredictor, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
{
stageProto.SetIsEffectsStage(NOpt::IsKqpEffectsStage(stage));
+ TStagePredictor& stagePredictor = rPredictor.BuildForStage(stage, ctx);
+ stagePredictor.Scan(stage.Program().Ptr());
+
for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
const auto& input = stage.Inputs().Item(inputIndex);
@@ -564,11 +568,6 @@ private:
}
}
- bool hasSort = false;
- bool hasMapJoin = false;
- bool hasUdf = false;
- bool hasFilter = false;
- bool hasWideCombiner = false;
VisitExpr(stage.Program().Ptr(), [&](const TExprNode::TPtr& exprNode) {
TExprBase node(exprNode);
if (auto maybeReadTable = node.Maybe<TKqpWideReadTable>()) {
@@ -652,20 +651,9 @@ private:
FillOlapProgram(readTableRanges.Process(), miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange());
FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS);
- } else if (node.Maybe<TCoSort>()) {
- hasSort = true;
- } else if (node.Maybe<TCoFilterBase>()) {
- hasFilter = true;
- } else if (node.Maybe<TCoWideCombiner>()) {
- hasWideCombiner = true;
- } else if (node.Maybe<TCoMapJoinCore>()) {
- hasMapJoin = true;
- } else if (node.Maybe<TCoUdf>()) {
- hasUdf = true;
} else {
YQL_ENSURE(!node.Maybe<TKqpReadTable>());
}
-
return true;
});
@@ -695,11 +683,8 @@ private:
auto& programProto = *stageProto.MutableProgram();
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
programProto.SetRaw(programBytecode);
- programProto.MutableSettings()->SetHasMapJoin(hasMapJoin);
- programProto.MutableSettings()->SetHasSort(hasSort);
- programProto.MutableSettings()->SetHasUdf(hasUdf);
- programProto.MutableSettings()->SetHasAggregation(hasWideCombiner);
- programProto.MutableSettings()->SetHasFilter(hasFilter);
+
+ stagePredictor.SerializeToKqpSettings(*programProto.MutableSettings());
for (auto member : paramsType->GetItems()) {
auto paramName = TString(member->GetName());
@@ -720,12 +705,17 @@ private:
TMap<ui64, ui32> stagesMap;
THashMap<TStringBuf, THashSet<TStringBuf>> tablesMap;
+ TRequestPredictor rPredictor;
for (const auto& stage : tx.Stages()) {
auto* protoStage = txProto.AddStages();
- CompileStage(stage, *protoStage, ctx, stagesMap, tablesMap);
+ CompileStage(stage, *protoStage, ctx, stagesMap, rPredictor, tablesMap);
hasEffectStage |= protoStage->GetIsEffectsStage();
stagesMap[stage.Ref().UniqueId()] = txProto.StagesSize() - 1;
}
+ for (auto&& i : *txProto.MutableStages()) {
+ i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel()));
+ }
+
YQL_ENSURE(hasEffectStage == txSettings.WithEffects);
diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
index cd96733a969..ff279362f0f 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.cpp
@@ -6,15 +6,15 @@ using namespace NYql::NDqProto;
using namespace NKikimrConfig;
TTaskResourceEstimation EstimateTaskResources(const TDqTask& task,
- const TTableServiceConfig::TResourceManager& config)
+ const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount)
{
TTaskResourceEstimation ret;
- EstimateTaskResources(task, config, ret);
+ EstimateTaskResources(task, config, ret, tasksCount);
return ret;
}
void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TResourceManager& config,
- TTaskResourceEstimation& ret)
+ TTaskResourceEstimation& ret, const ui32 tasksCount)
{
ret.TaskId = task.GetId();
ret.ChannelBuffersCount += task.GetInputs().size() ? 1 : 0;
@@ -30,9 +30,9 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso
const auto& opts = task.GetProgram().GetSettings();
if (/* opts.GetHasSort() || */opts.GetHasMapJoin()) {
- ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit();
+ ret.MkqlProgramMemoryLimit = config.GetMkqlHeavyProgramMemoryLimit() / tasksCount;
} else {
- ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit();
+ ret.MkqlProgramMemoryLimit = config.GetMkqlLightProgramMemoryLimit() / tasksCount;
}
ret.TotalMemoryLimit = ret.ChannelBuffersCount * ret.ChannelBufferMemoryLimit
@@ -40,12 +40,12 @@ void EstimateTaskResources(const TDqTask& task, const TTableServiceConfig::TReso
}
TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks,
- const TTableServiceConfig::TResourceManager& config)
+ const TTableServiceConfig::TResourceManager& config, const ui32 tasksCount)
{
TVector<TTaskResourceEstimation> ret;
ret.resize(tasks.size());
for (ui64 i = 0; i < tasks.size(); ++i) {
- EstimateTaskResources(tasks[i], config, ret[i]);
+ EstimateTaskResources(tasks[i], config, ret[i], tasksCount);
}
return ret;
}
diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation.h b/ydb/core/kqp/rm_service/kqp_resource_estimation.h
index 2cb67518383..9abbf8dcc78 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_estimation.h
+++ b/ydb/core/kqp/rm_service/kqp_resource_estimation.h
@@ -27,12 +27,12 @@ struct TTaskResourceEstimation {
};
TTaskResourceEstimation EstimateTaskResources(const NYql::NDqProto::TDqTask& task,
- const NKikimrConfig::TTableServiceConfig::TResourceManager& config);
+ const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount);
void EstimateTaskResources(const NYql::NDqProto::TDqTask& task,
- const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result);
+ const NKikimrConfig::TTableServiceConfig::TResourceManager& config, TTaskResourceEstimation& result, const ui32 tasksCount);
TVector<TTaskResourceEstimation> EstimateTasksResources(const TVector<NYql::NDqProto::TDqTask>& tasks,
- const NKikimrConfig::TTableServiceConfig::TResourceManager& config);
+ const NKikimrConfig::TTableServiceConfig::TResourceManager& config, const ui32 tasksCount);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp b/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp
index 64bb10bae36..b741dc2220c 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_estimation_ut.cpp
@@ -31,7 +31,7 @@ Y_UNIT_TEST(TestChannelSize) {
auto* output = task.MutableOutputs()->Add();
output->MutableChannels()->Add();
- auto est = EstimateTaskResources(task, config);
+ auto est = EstimateTaskResources(task, config, 1);
UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount);
UNIT_ASSERT_EQUAL(est.ChannelBufferMemoryLimit, config.GetChannelBufferSize());
@@ -41,7 +41,7 @@ Y_UNIT_TEST(TestChannelSize) {
input->MutableChannels()->Add();
}
- est = EstimateTaskResources(task, config);
+ est = EstimateTaskResources(task, config, 1);
UNIT_ASSERT_EQUAL(2, est.ChannelBuffersCount);
UNIT_ASSERT(est.ChannelBufferMemoryLimit == config.GetChannelBufferSize());
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2cab7a8d621..05bd98b896b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1200,9 +1200,8 @@ message TTableServiceConfig {
optional uint32 DSBaseSortScanThreads = 32 [default = 16];
optional uint32 DSBaseJoinScanThreads = 33 [default = 16];
- optional double AggregationHardThreadsKff = 34 [default = 2];
- optional double AggregationSortThreadsKff = 35 [default = 1];
- optional double AggregationJoinThreadsKff = 36 [default = 1];
+ optional uint32 AggregationComputeThreads = 37;
+
}
message TExecuterRetriesConfig {
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 708189c4b20..b812255752f 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -33,8 +33,17 @@ message TProgram {
bool HasMapJoin = 1;
bool HasSort = 2;
bool HasUdf = 3;
- bool HasAggregation = 4;
+ bool HasStateAggregation = 4;
bool HasFilter = 5;
+ uint32 StageLevel = 6;
+ double LevelDataPrediction = 7;
+ bool HasFinalAggregation = 8;
+ bool HasTop = 9;
+ uint32 GroupByKeysCount = 10;
+ bool HasRangeScan = 11;
+ bool HasLookup = 12;
+ double InputDataPrediction = 13;
+ double OutputDataPrediction = 14;
}
uint32 RuntimeVersion = 1;