diff options
author | aakulaga <[email protected]> | 2022-12-13 11:12:43 +0300 |
---|---|---|
committer | aakulaga <[email protected]> | 2022-12-13 11:12:43 +0300 |
commit | 2f75779388bb5f5a7fc534fbb62f6fd34f6e1387 (patch) | |
tree | 955f0aacb69b602129783f75ee096ee45ddabec6 | |
parent | 02efb5d0f40aa0169d1fe65e769dde0e53fb8900 (diff) |
HashShuffle settings added
HashShuffle settings added
4 files changed, 17 insertions, 6 deletions
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 854cb6977b8..9ab5f580432 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -3,14 +3,16 @@ #include <ydb/library/yql/dq/tasks/dq_tasks_graph.h> #include <ydb/library/yql/dq/opt/dq_opt.h> + namespace NYql::NDq { using TChannelLogFunc = std::function<void(ui64 channel, ui64 from, ui64 to, TStringBuf type, bool enableSpilling)>; template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> -void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { +void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { ui32 partitionsCount = 1; + auto& stageInfo = graph.GetStageInfo(stage); for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { const auto& input = stage.Inputs().Item(inputIndex); @@ -31,8 +33,8 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { auto shuffle = maybeCnShuffle.Cast(); auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage()); - partitionsCount = std::max(partitionsCount, (ui32)originStageInfo.Tasks.size() / 2); - partitionsCount = std::min(partitionsCount, 24u); + partitionsCount = std::max(partitionsCount, (ui32) (originStageInfo.Tasks.size() * hashShuffleTasksRatio) ); + partitionsCount = std::min(partitionsCount, maxHashShuffleTasks); } else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) { auto cnMap = maybeCnMap.Cast(); auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage()); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 61bb14a3a5e..ac6d19720c8 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -55,6 +55,8 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, UseAggPhases); REGISTER_SETTING(*this, ParallelOperationsLimit).Lower(1).Upper(128); REGISTER_SETTING(*this, HashJoinMode).Parser([](const TString& v) { return FromString<NDq::EHashJoinMode>(v); }); + REGISTER_SETTING(*this, HashShuffleTasksRatio).Lower(0.5).Upper(5); + REGISTER_SETTING(*this, HashShuffleMaxTasks).Lower(1).Upper(1000); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index daae87e9d66..448ceddf35b 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -32,6 +32,8 @@ struct TDqSettings { static constexpr ui64 WatermarksGranularityMs = 1000; static constexpr ui64 WatermarksLateArrivalDelayMs = 5000; static constexpr ui64 ParallelOperationsLimit = 16; + static constexpr double HashShuffleTasksRatio = 0.5; + static constexpr ui32 HashShuffleMaxTasks = 24; }; using TPtr = std::shared_ptr<TDqSettings>; @@ -84,6 +86,8 @@ struct TDqSettings { NCommon::TConfSetting<TString, false> WorkerFilter; NCommon::TConfSetting<NDq::EHashJoinMode, false> HashJoinMode; + NCommon::TConfSetting<double, false> HashShuffleTasksRatio; + NCommon::TConfSetting<ui32, false> HashShuffleMaxTasks; // This options will be passed to executor_actor and worker_actor template <typename TProtoConfig> @@ -127,7 +131,8 @@ struct TDqSettings { SAVE_SETTING(WatermarksLateArrivalDelayMs); SAVE_SETTING(UseAggPhases); SAVE_SETTING(HashJoinMode); - + SAVE_SETTING(HashShuffleTasksRatio); + SAVE_SETTING(HashShuffleMaxTasks); #undef SAVE_SETTING } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 46c3e90b4f8..0b120f2ea3f 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -153,7 +153,9 @@ namespace NYql::NDqs { YQL_CLOG(TRACE, ProviderDq) << "Read stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); } else { YQL_CLOG(TRACE, ProviderDq) << "Common stage " << NCommon::ExprToPrettyString(ExprContext, *stage.Ptr()); - NDq::CommonBuildTasks(TasksGraph, stage); + double hashShuffleTasksRatio = settings->HashShuffleTasksRatio.Get().GetOrElse(TDqSettings::TDefault::HashShuffleTasksRatio); + ui64 maxHashShuffleTasks = settings->HashShuffleMaxTasks.Get().GetOrElse(TDqSettings::TDefault::HashShuffleMaxTasks); + NDq::CommonBuildTasks(hashShuffleTasksRatio, maxHashShuffleTasks, TasksGraph, stage); } // Sinks @@ -564,7 +566,7 @@ namespace NYql::NDqs { continue; \ } - void TDqsExecutionPlanner::BuildConnections(const NNodes::TDqPhyStage& stage) { + void TDqsExecutionPlanner::BuildConnections( const NNodes::TDqPhyStage& stage) { NDq::TChannelLogFunc logFunc = [](ui64, ui64, ui64, TStringBuf, bool) {}; for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { |