diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-09 12:02:03 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-09 12:02:03 +0300 |
commit | 416b62668249e5744697c7b87783c83b991996a6 (patch) | |
tree | dedf11bd3bfc59b5c2cdbf6ef1b80a0be98e519c | |
parent | 87b0a5af3f32f90698677ebce58ad575e4d9838a (diff) | |
download | ydb-416b62668249e5744697c7b87783c83b991996a6.tar.gz |
tasks calculation improve
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 43 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 8 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 11 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_tasks.proto | 2 |
5 files changed, 64 insertions, 15 deletions
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 8051f5ee57..591eaf96b7 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -124,6 +124,15 @@ namespace NActors { ui32 GetThreads(ui32 poolId) const { return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId); } + + std::optional<ui32> GetThreads(const TString& poolName) const { + for (ui32 i = 0; i < GetExecutorsCount(); ++i) { + if (GetPoolName(i) == poolName) { + return GetThreads(i); + } + } + return {}; + } }; class TActorSystem : TNonCopyable { @@ -284,6 +293,12 @@ namespace NActors { } void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; + std::optional<ui32> GetPoolThreadsCount(const TString& poolName) const { + if (!SystemSetup) { + return {}; + } + return SystemSetup->GetThreads(poolName); + } void DeferPreStop(std::function<void()> fn) { DeferredPreStop.push_back(std::move(fn)); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index b996a6e3f9..5bea057897 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -27,6 +27,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/core/log.h> +#include <util/system/info.h> namespace NKikimr { namespace NKqp { @@ -185,16 +186,43 @@ private: ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const { ui32 result = 0; if (isOlapScan) { - result = AggregationSettings.GetCSScanMinimalThreads(); + if (AggregationSettings.HasCSScanMinimalThreads()) { + result = AggregationSettings.GetCSScanMinimalThreads(); + } else { + std::optional<ui32> userPoolSize; + if (TlsActivationContext && TlsActivationContext->ActorSystem()) { + userPoolSize = TlsActivationContext->ActorSystem()->GetPoolThreadsCount("user"); + } + if (!userPoolSize) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "user pool is undefined for executer tasks construction"; + userPoolSize = NSystemInfo::NumberOfCpus(); + } + result = *userPoolSize; + } } else { result = AggregationSettings.GetDSScanMinimalThreads(); + const auto& stage = GetStage(stageInfo); + if (stage.GetProgram().GetSettings().GetHasSort()) { + result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); + } + if (stage.GetProgram().GetSettings().GetHasMapJoin()) { + result = std::max(result, AggregationSettings.GetDSBaseJoinScanThreads()); + } } + return Max<ui32>(1, result); + } + + ui32 GetMaxTasksAggregation(TStageInfo& stageInfo, const ui32 previousTasksCount) const { + ui32 result = Max<ui32>(1, previousTasksCount / 2); const auto& stage = GetStage(stageInfo); + if (stage.GetProgram().GetSettings().GetHasAggregation() && !stage.GetProgram().GetSettings().GetHasFilter()) { + result *= AggregationSettings.GetAggregationHardThreadsKff(); + } if (stage.GetProgram().GetSettings().GetHasSort()) { - result = std::max(result, AggregationSettings.GetSortScanThreads()); + result *= AggregationSettings.GetAggregationSortThreadsKff(); } if (stage.GetProgram().GetSettings().GetHasMapJoin()) { - result = std::max(result, AggregationSettings.GetJoinScanThreads()); + result *= AggregationSettings.GetAggregationJoinThreadsKff(); } return result; } @@ -357,14 +385,7 @@ private: switch (input.GetTypeCase()) { case NKqpProto::TKqpPhyConnection::kHashShuffle: { - partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); - ui32 nodes = ShardsOnNode.size(); - if (nodes) { - // <= 2 tasks on node - partitionsCount = std::min(partitionsCount, std::min(AggregationSettings.GetAggregationComputeThreads(), nodes * 2)); - } else { - partitionsCount = std::min(partitionsCount, AggregationSettings.GetAggregationComputeThreads()); - } + partitionsCount = Max<ui32>(1, GetMaxTasksAggregation(stageInfo, originStageInfo.Tasks.size())); break; } diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 356838f9c7..026c1edeaa 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -567,6 +567,8 @@ private: bool hasSort = false; bool hasMapJoin = false; bool hasUdf = false; + bool hasFilter = false; + bool hasWideCombiner = false; VisitExpr(stage.Program().Ptr(), [&](const TExprNode::TPtr& exprNode) { TExprBase node(exprNode); if (auto maybeReadTable = node.Maybe<TKqpWideReadTable>()) { @@ -638,6 +640,10 @@ private: FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange()); } else if (node.Maybe<TCoSort>()) { hasSort = true; + } else if (node.Maybe<TCoFilterBase>()) { + hasFilter = true; + } else if (node.Maybe<TCoWideCombiner>()) { + hasWideCombiner = true; } else if (node.Maybe<TCoMapJoinCore>()) { hasMapJoin = true; } else if (node.Maybe<TCoUdf>()) { @@ -678,6 +684,8 @@ private: programProto.MutableSettings()->SetHasMapJoin(hasMapJoin); programProto.MutableSettings()->SetHasSort(hasSort); programProto.MutableSettings()->SetHasUdf(hasUdf); + programProto.MutableSettings()->SetHasAggregation(hasWideCombiner); + programProto.MutableSettings()->SetHasFilter(hasFilter); for (auto member : paramsType->GetItems()) { auto paramName = TString(member->GetName()); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index aa007f7f38..2ac9d4fb68 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1194,11 +1194,14 @@ message TTableServiceConfig { } message TAggregationConfig { - optional uint32 CSScanMinimalThreads = 29 [default = 16]; + optional uint32 CSScanMinimalThreads = 29; optional uint32 DSScanMinimalThreads = 30 [default = 4]; - optional uint32 AggregationComputeThreads = 31 [default = 32]; - optional uint32 SortScanThreads = 32 [default = 16]; - optional uint32 JoinScanThreads = 33 [default = 16]; + optional uint32 DSBaseSortScanThreads = 32 [default = 16]; + optional uint32 DSBaseJoinScanThreads = 33 [default = 16]; + + optional double AggregationHardThreadsKff = 34 [default = 2]; + optional double AggregationSortThreadsKff = 35 [default = 1]; + optional double AggregationJoinThreadsKff = 36 [default = 1]; } optional uint32 QueryLimitBytes = 1; diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 0e7cfbdea1..708189c4b2 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -33,6 +33,8 @@ message TProgram { bool HasMapJoin = 1; bool HasSort = 2; bool HasUdf = 3; + bool HasAggregation = 4; + bool HasFilter = 5; } uint32 RuntimeVersion = 1; |