diff options
author | aozeritsky <[email protected]> | 2023-10-03 15:07:14 +0300 |
---|---|---|
committer | aozeritsky <[email protected]> | 2023-10-03 16:21:29 +0300 |
commit | f847a2d778260a7998d5bf7fb77709eb3060e2bc (patch) | |
tree | 7e62a300d4a9e08db8a499e4787520f02a726233 | |
parent | b8c57b027c3584fb00c34fa01a42b1bfb0282521 (diff) |
Don't change partitionsCount after first Map connection
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_connection_builder.h | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 7401576c613..a53dce77728 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -30,15 +30,18 @@ void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TD input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content()); } - if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { + if (input.Maybe<NNodes::TDqCnUnionAll>() || input.Maybe<NNodes::TDqCnMerge>()) { + // Prevent UnionAll after Map or Shuffle + YQL_ENSURE(partitionsCount == 1); + } else if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) { auto shuffle = maybeCnShuffle.Cast(); - auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage()); + const auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage()); partitionsCount = std::max(partitionsCount, (ui32) (originStageInfo.Tasks.size() * hashShuffleTasksRatio) ); partitionsCount = std::min(partitionsCount, maxHashShuffleTasks); } else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) { auto cnMap = maybeCnMap.Cast(); - auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage()); - partitionsCount = originStageInfo.Tasks.size(); + const auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage()); + maxHashShuffleTasks = partitionsCount = originStageInfo.Tasks.size(); } } |