aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-30 16:19:48 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-30 16:19:48 +0300
commit83e5387f4dd51e161d8be3bca323a541b2d33dac (patch)
treea6541ca0cfe715bc852f85dde3b27c2ded090081
parentcbce569f4ae0dc85d379185bb16dfa46bc4186e8 (diff)
downloadydb-83e5387f4dd51e161d8be3bca323a541b2d33dac.tar.gz
add options for aggregation executor configuration
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp36
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp2
-rw-r--r--ydb/core/protos/config.proto11
6 files changed, 36 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index a83e3b6848..5c09f69138 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -84,7 +84,7 @@ struct TEvKqpExecuter {
};
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure(
IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index a844560fbb..9523883e75 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -145,7 +145,7 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
@@ -185,7 +185,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return data
? CreateKqpDataExecuter(std::move(request), database, userToken, counters)
- : CreateKqpScanExecuter(std::move(request), database, userToken, counters);
+ : CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 2b312831ab..91e5a335be 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1097,7 +1097,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 6a3cde0308..d61f577ddd 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -45,8 +45,9 @@ public:
}
TKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
: TBase(std::move(request), database, userToken, counters, TWilsonKqp::ScanExecuter, "ScanExecuter")
+ , AggregationSettings(aggregation)
{
YQL_ENSURE(Request.Transactions.size() == 1);
YQL_ENSURE(Request.DataShardLocks.empty());
@@ -181,17 +182,21 @@ private:
}
};
- static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) {
- // TODO: take into account number of active scans on node
- const auto& stage = GetStage(stageInfo);
- const bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() ||
- stage.GetProgram().GetSettings().GetHasMapJoin();
-
- if (heavyProgram) {
- return 4;
+ ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan) const {
+ ui32 result = 0;
+ if (isOlapScan) {
+ result = AggregationSettings.GetCSScanMinimalThreads();
} else {
- return 16;
+ result = AggregationSettings.GetDSScanMinimalThreads();
+ }
+ const auto& stage = GetStage(stageInfo);
+ if (stage.GetProgram().GetSettings().GetHasSort()) {
+ result = std::max(result, AggregationSettings.GetSortScanThreads());
+ }
+ if (stage.GetProgram().GetSettings().GetHasMapJoin()) {
+ result = std::max(result, AggregationSettings.GetJoinScanThreads());
}
+ return result;
}
TTask& AssignTaskToShard(
@@ -210,7 +215,7 @@ private:
auto& tasks = nodeTasks[nodeId];
auto& cnt = assignedShardsCount[nodeId];
- const ui32 maxScansPerNode = isOlapScan ? 16 : GetMaxTasksPerNodeEstimate(stageInfo);
+ const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo, isOlapScan);
if (cnt < maxScansPerNode) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
@@ -356,9 +361,9 @@ private:
ui32 nodes = ShardsOnNode.size();
if (nodes) {
// <= 2 tasks on node
- partitionsCount = std::min(partitionsCount, std::min(24u, nodes * 2));
+ partitionsCount = std::min(partitionsCount, std::min(AggregationSettings.GetAggregationComputeThreads(), nodes * 2));
} else {
- partitionsCount = std::min(partitionsCount, 24u);
+ partitionsCount = std::min(partitionsCount, AggregationSettings.GetAggregationComputeThreads());
}
break;
}
@@ -774,14 +779,15 @@ public:
private:
std::unordered_map<ui64, IActor*> ResultChannelProxies;
+ const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
};
} // namespace
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters)
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation)
{
- return new TKqpScanExecuter(std::move(request), database, userToken, counters);
+ return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index b888823564..496470b159 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1287,7 +1287,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
- RequestCounters);
+ RequestCounters, Settings.Service.GetAggregationConfig());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index eded529f54..5cb11b7677 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1186,6 +1186,14 @@ message TTableServiceConfig {
optional uint32 LeakyBucketQuotaBucketDurationSeconds = 6 [default = 60];
}
+ message TAggregationConfig {
+ optional uint32 CSScanMinimalThreads = 29 [default = 16];
+ optional uint32 DSScanMinimalThreads = 30 [default = 4];
+ optional uint32 AggregationComputeThreads = 31 [default = 32];
+ optional uint32 SortScanThreads = 32 [default = 16];
+ optional uint32 JoinScanThreads = 33 [default = 16];
+ }
+
optional uint32 QueryLimitBytes = 1;
optional uint32 ParametersLimitBytes = 2;
optional uint32 SessionsLimitPerNode = 3;
@@ -1215,6 +1223,7 @@ message TTableServiceConfig {
optional bool EnableKqpScanQuerySourceRead = 26 [default = false];
optional bool EnableKqpDataQuerySourceRead = 27 [default = false];
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
+ optional TAggregationConfig AggregationConfig = 29;
};
// Config describes immediate controls and allows
@@ -1748,7 +1757,7 @@ message TCompatibilityRule {
optional TYdbVersion BottomLimit = 2;
optional TYdbVersion UpperLimit = 3;
- // don't use enum, because stored data can have values from newer YDB versions,
+ // don't use enum, because stored data can have values from newer YDB versions,
// which are not included in current version
optional uint32 ComponentId = 4;