summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <[email protected]>2022-12-13 11:12:43 +0300
committeraakulaga <[email protected]>2022-12-13 11:12:43 +0300
commit2f75779388bb5f5a7fc534fbb62f6fd34f6e1387 (patch)
tree955f0aacb69b602129783f75ee096ee45ddabec6
parent02efb5d0f40aa0169d1fe65e769dde0e53fb8900 (diff)
HashShuffle settings added
HashShuffle settings added
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h8
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h7
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp6
4 files changed, 17 insertions, 6 deletions
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 854cb6977b8..9ab5f580432 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -3,14 +3,16 @@
#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
+
namespace NYql::NDq {
using TChannelLogFunc = std::function<void(ui64 channel, ui64 from, ui64 to, TStringBuf type, bool enableSpilling)>;
template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
-void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) {
+void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) {
ui32 partitionsCount = 1;
+
auto& stageInfo = graph.GetStageInfo(stage);
for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
const auto& input = stage.Inputs().Item(inputIndex);
@@ -31,8 +33,8 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp
if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) {
auto shuffle = maybeCnShuffle.Cast();
auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage());
- partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2);
- partitionsCount = std::min(partitionsCount, 24u);
+ partitionsCount = std::max(partitionsCount, (ui32) (originStageInfo.Tasks.size() * hashShuffleTasksRatio) );
+ partitionsCount = std::min(partitionsCount, maxHashShuffleTasks);
} else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) {
auto cnMap = maybeCnMap.Cast();
auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage());
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 61bb14a3a5e..ac6d19720c8 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -55,6 +55,8 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, UseAggPhases);
REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128);
REGISTER_SETTING(*this, HashJoinMode).Parser([](const TString& v) { return FromString<NDq::EHashJoinMode>(v); });
+ REGISTER_SETTING(*this, HashShuffleTasksRatio).Lower(0.5).Upper(5);
+ REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index daae87e9d66..448ceddf35b 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -32,6 +32,8 @@ struct TDqSettings {
static constexpr ui64 WatermarksGranularityMs = 1000;
static constexpr ui64 WatermarksLateArrivalDelayMs = 5000;
static constexpr ui64 ParallelOperationsLimit = 16;
+ static constexpr double HashShuffleTasksRatio = 0.5;
+ static constexpr ui32 HashShuffleMaxTasks = 24;
};
using TPtr = std::shared_ptr<TDqSettings>;
@@ -84,6 +86,8 @@ struct TDqSettings {
NCommon::TConfSetting<TString, false> WorkerFilter;
NCommon::TConfSetting<NDq::EHashJoinMode, false> HashJoinMode;
+ NCommon::TConfSetting<double, false> HashShuffleTasksRatio;
+ NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks;
// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
@@ -127,7 +131,8 @@ struct TDqSettings {
SAVE_SETTING(WatermarksLateArrivalDelayMs);
SAVE_SETTING(UseAggPhases);
SAVE_SETTING(HashJoinMode);
-
+ SAVE_SETTING(HashShuffleTasksRatio);
+ SAVE_SETTING(HashShuffleMaxTasks);
#undef SAVE_SETTING
}
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 46c3e90b4f8..0b120f2ea3f 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -153,7 +153,9 @@ namespace NYql::NDqs {
YQL_CLOG(TRACE, ProviderDq) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
} else {
YQL_CLOG(TRACE, ProviderDq) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr());
- NDq::CommonBuildTasks(TasksGraph, stage);
+ double hashShuffleTasksRatio = settings->HashShuffleTasksRatio.Get().GetOrElse(TDqSettings::TDefault::HashShuffleTasksRatio);
+ ui64 maxHashShuffleTasks = settings->HashShuffleMaxTasks.Get().GetOrElse(TDqSettings::TDefault::HashShuffleMaxTasks);
+ NDq::CommonBuildTasks(hashShuffleTasksRatio, maxHashShuffleTasks, TasksGraph, stage);
}
// Sinks
@@ -564,7 +566,7 @@ namespace NYql::NDqs {
continue; \
}
- void TDqsExecutionPlanner::BuildConnections(const NNodes::TDqPhyStage& stage) {
+ void TDqsExecutionPlanner::BuildConnections( const NNodes::TDqPhyStage& stage) {
NDq::TChannelLogFunc logFunc = [](ui64, ui64, ui64, TStringBuf, bool) {};
for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {