diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-30 16:19:48 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-30 16:19:48 +0300 |
commit | 83e5387f4dd51e161d8be3bca323a541b2d33dac (patch) | |
tree | a6541ca0cfe715bc852f85dde3b27c2ded090081 | |
parent | cbce569f4ae0dc85d379185bb16dfa46bc4186e8 (diff) | |
download | ydb-83e5387f4dd51e161d8be3bca323a541b2d33dac.tar.gz |
add options for aggregation executor configuration
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 11 |
6 files changed, 36 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index a83e3b6848..5c09f69138 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -84,7 +84,7 @@ struct TEvKqpExecuter { }; IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure( IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index a844560fbb..9523883e75 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -145,7 +145,7 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction @@ -185,7 +185,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return data ? CreateKqpDataExecuter(std::move(request), database, userToken, counters) - : CreateKqpScanExecuter(std::move(request), database, userToken, counters); + : CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 2b312831ab..91e5a335be 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1097,7 +1097,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 6a3cde0308..d61f577ddd 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -45,8 +45,9 @@ public: } TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) : TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter") + , AggregationSettings(aggregation) { YQL_ENSURE(Request.Transactions.size() == 1); YQL_ENSURE(Request.DataShardLocks.empty()); @@ -181,17 +182,21 @@ private: } }; - static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) { - // TODO: take into account number of active scans on node - const auto& stage = GetStage(stageInfo); - const bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() || - stage.GetProgram().GetSettings().GetHasMapJoin(); - - if (heavyProgram) { - return 4; + ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const { + ui32 result = 0; + if (isOlapScan) { + result = AggregationSettings.GetCSScanMinimalThreads(); } else { - return 16; + result = AggregationSettings.GetDSScanMinimalThreads(); + } + const auto& stage = GetStage(stageInfo); + if (stage.GetProgram().GetSettings().GetHasSort()) { + result = std::max(result, AggregationSettings.GetSortScanThreads()); + } + if (stage.GetProgram().GetSettings().GetHasMapJoin()) { + result = std::max(result, AggregationSettings.GetJoinScanThreads()); } + return result; } TTask& AssignTaskToShard( @@ -210,7 +215,7 @@ private: auto& tasks = nodeTasks[nodeId]; auto& cnt = assignedShardsCount[nodeId]; - const ui32 maxScansPerNode = isOlapScan ? 16 : GetMaxTasksPerNodeEstimate(stageInfo); + const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan); if (cnt < maxScansPerNode) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; @@ -356,9 +361,9 @@ private: ui32 nodes = ShardsOnNode.size(); if (nodes) { // <= 2 tasks on node - partitionsCount = std::min(partitionsCount, std::min(24u, nodes * 2)); + partitionsCount = std::min(partitionsCount, std::min(AggregationSettings.GetAggregationComputeThreads(), nodes * 2)); } else { - partitionsCount = std::min(partitionsCount, 24u); + partitionsCount = std::min(partitionsCount, AggregationSettings.GetAggregationComputeThreads()); } break; } @@ -774,14 +779,15 @@ public: private: std::unordered_map<ui64, IActor*> ResultChannelProxies; + const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; }; } // namespace IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters) + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation) { - return new TKqpScanExecuter(std::move(request), database, userToken, counters); + return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b888823564..496470b159 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1287,7 +1287,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, (QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(), - RequestCounters); + RequestCounters, Settings.Service.GetAggregationConfig()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index eded529f54..5cb11b7677 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1186,6 +1186,14 @@ message TTableServiceConfig { optional uint32 LeakyBucketQuotaBucketDurationSeconds = 6 [default = 60]; } + message TAggregationConfig { + optional uint32 CSScanMinimalThreads = 29 [default = 16]; + optional uint32 DSScanMinimalThreads = 30 [default = 4]; + optional uint32 AggregationComputeThreads = 31 [default = 32]; + optional uint32 SortScanThreads = 32 [default = 16]; + optional uint32 JoinScanThreads = 33 [default = 16]; + } + optional uint32 QueryLimitBytes = 1; optional uint32 ParametersLimitBytes = 2; optional uint32 SessionsLimitPerNode = 3; @@ -1215,6 +1223,7 @@ message TTableServiceConfig { optional bool EnableKqpScanQuerySourceRead = 26 [default = false]; optional bool EnableKqpDataQuerySourceRead = 27 [default = false]; optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; + optional TAggregationConfig AggregationConfig = 29; }; // Config describes immediate controls and allows @@ -1748,7 +1757,7 @@ message TCompatibilityRule { optional TYdbVersion BottomLimit = 2; optional TYdbVersion UpperLimit = 3; - // don't use enum, because stored data can have values from newer YDB versions, + // don't use enum, because stored data can have values from newer YDB versions, // which are not included in current version optional uint32 ComponentId = 4; |