aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-02-09 12:02:03 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-02-09 12:02:03 +0300
commit416b62668249e5744697c7b87783c83b991996a6 (patch)
treededf11bd3bfc59b5c2cdbf6ef1b80a0be98e519c
parent87b0a5af3f32f90698677ebce58ad575e4d9838a (diff)
downloadydb-416b62668249e5744697c7b87783c83b991996a6.tar.gz
tasks calculation improve
-rw-r--r--library/cpp/actors/core/actorsystem.h15
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp43
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp8
-rw-r--r--ydb/core/protos/config.proto11
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto2
5 files changed, 64 insertions, 15 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 8051f5ee57..591eaf96b7 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -124,6 +124,15 @@ namespace NActors {
ui32 GetThreads(ui32 poolId) const {
return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
}
+
+ std::optional<ui32> GetThreads(const TString& poolName) const {
+ for (ui32 i = 0; i < GetExecutorsCount(); ++i) {
+ if (GetPoolName(i) == poolName) {
+ return GetThreads(i);
+ }
+ }
+ return {};
+ }
};
class TActorSystem : TNonCopyable {
@@ -284,6 +293,12 @@ namespace NActors {
}
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
+ std::optional<ui32> GetPoolThreadsCount(const TString& poolName) const {
+ if (!SystemSetup) {
+ return {};
+ }
+ return SystemSetup->GetThreads(poolName);
+ }
void DeferPreStop(std::function<void()> fn) {
DeferredPreStop.push_back(std::move(fn));
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index b996a6e3f9..5bea057897 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -27,6 +27,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/log.h>
+#include <util/system/info.h>
namespace NKikimr {
namespace NKqp {
@@ -185,16 +186,43 @@ private:
ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const {
ui32 result = 0;
if (isOlapScan) {
- result = AggregationSettings.GetCSScanMinimalThreads();
+ if (AggregationSettings.HasCSScanMinimalThreads()) {
+ result = AggregationSettings.GetCSScanMinimalThreads();
+ } else {
+ std::optional<ui32> userPoolSize;
+ if (TlsActivationContext && TlsActivationContext->ActorSystem()) {
+ userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount("user");
+ }
+ if (!userPoolSize) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction";
+ userPoolSize = NSystemInfo::NumberOfCpus();
+ }
+ result = *userPoolSize;
+ }
} else {
result = AggregationSettings.GetDSScanMinimalThreads();
+ const auto& stage = GetStage(stageInfo);
+ if (stage.GetProgram().GetSettings().GetHasSort()) {
+ result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
+ }
+ if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
+ result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads());
+ }
}
+ return Max<ui32>(1, result);
+ }
+
+ ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount) const {
+ ui32 result = Max<ui32>(1, previousTasksCount / 2);
const auto& stage = GetStage(stageInfo);
+ if (stage.GetProgram().GetSettings().GetHasAggregation() && !stage.GetProgram().GetSettings().GetHasFilter()) {
+ result *= AggregationSettings.GetAggregationHardThreadsKff();
+ }
if (stage.GetProgram().GetSettings().GetHasSort()) {
- result = std::max(result, AggregationSettings.GetSortScanThreads());
+ result *= AggregationSettings.GetAggregationSortThreadsKff();
}
if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
- result = std::max(result, AggregationSettings.GetJoinScanThreads());
+ result *= AggregationSettings.GetAggregationJoinThreadsKff();
}
return result;
}
@@ -357,14 +385,7 @@ private:
switch (input.GetTypeCase()) {
case NKqpProto::TKqpPhyConnection::kHashShuffle: {
- partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2);
- ui32 nodes = ShardsOnNode.size();
- if (nodes) {
- // <= 2 tasks on node
- partitionsCount = std::min(partitionsCount, std::min(AggregationSettings.GetAggregationComputeThreads(), nodes * 2));
- } else {
- partitionsCount = std::min(partitionsCount, AggregationSettings.GetAggregationComputeThreads());
- }
+ partitionsCount = Max<ui32>(1, GetMaxTasksAggregation(stageInfo, originStageInfo.Tasks.size()));
break;
}
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 356838f9c7..026c1edeaa 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -567,6 +567,8 @@ private:
bool hasSort = false;
bool hasMapJoin = false;
bool hasUdf = false;
+ bool hasFilter = false;
+ bool hasWideCombiner = false;
VisitExpr(stage.Program().Ptr(), [&](const TExprNode::TPtr& exprNode) {
TExprBase node(exprNode);
if (auto maybeReadTable = node.Maybe<TKqpWideReadTable>()) {
@@ -638,6 +640,10 @@ private:
FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
} else if (node.Maybe<TCoSort>()) {
hasSort = true;
+ } else if (node.Maybe<TCoFilterBase>()) {
+ hasFilter = true;
+ } else if (node.Maybe<TCoWideCombiner>()) {
+ hasWideCombiner = true;
} else if (node.Maybe<TCoMapJoinCore>()) {
hasMapJoin = true;
} else if (node.Maybe<TCoUdf>()) {
@@ -678,6 +684,8 @@ private:
programProto.MutableSettings()->SetHasMapJoin(hasMapJoin);
programProto.MutableSettings()->SetHasSort(hasSort);
programProto.MutableSettings()->SetHasUdf(hasUdf);
+ programProto.MutableSettings()->SetHasAggregation(hasWideCombiner);
+ programProto.MutableSettings()->SetHasFilter(hasFilter);
for (auto member : paramsType->GetItems()) {
auto paramName = TString(member->GetName());
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index aa007f7f38..2ac9d4fb68 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1194,11 +1194,14 @@ message TTableServiceConfig {
}
message TAggregationConfig {
- optional uint32 CSScanMinimalThreads = 29 [default = 16];
+ optional uint32 CSScanMinimalThreads = 29;
optional uint32 DSScanMinimalThreads = 30 [default = 4];
- optional uint32 AggregationComputeThreads = 31 [default = 32];
- optional uint32 SortScanThreads = 32 [default = 16];
- optional uint32 JoinScanThreads = 33 [default = 16];
+ optional uint32 DSBaseSortScanThreads = 32 [default = 16];
+ optional uint32 DSBaseJoinScanThreads = 33 [default = 16];
+
+ optional double AggregationHardThreadsKff = 34 [default = 2];
+ optional double AggregationSortThreadsKff = 35 [default = 1];
+ optional double AggregationJoinThreadsKff = 36 [default = 1];
}
optional uint32 QueryLimitBytes = 1;
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 0e7cfbdea1..708189c4b2 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -33,6 +33,8 @@ message TProgram {
bool HasMapJoin = 1;
bool HasSort = 2;
bool HasUdf = 3;
+ bool HasAggregation = 4;
+ bool HasFilter = 5;
}
uint32 RuntimeVersion = 1;