summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <[email protected]>2023-10-03 15:07:14 +0300
committeraozeritsky <[email protected]>2023-10-03 16:21:29 +0300
commitf847a2d778260a7998d5bf7fb77709eb3060e2bc (patch)
tree7e62a300d4a9e08db8a499e4787520f02a726233
parentb8c57b027c3584fb00c34fa01a42b1bfb0282521 (diff)
Don't change partitionsCount after first Map connection
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h11
1 files changed, 7 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 7401576c613..a53dce77728 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -30,15 +30,18 @@ void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TD
input.Maybe<NNodes::TDqCnHashShuffle>(), "" << input.Ref().Content());
}
- if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) {
+ if (input.Maybe<NNodes::TDqCnUnionAll>() || input.Maybe<NNodes::TDqCnMerge>()) {
+ // Prevent UnionAll after Map or Shuffle
+ YQL_ENSURE(partitionsCount == 1);
+ } else if (auto maybeCnShuffle = input.Maybe<NNodes::TDqCnHashShuffle>()) {
auto shuffle = maybeCnShuffle.Cast();
- auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage());
+ const auto& originStageInfo = graph.GetStageInfo(shuffle.Output().Stage());
partitionsCount = std::max(partitionsCount, (ui32) (originStageInfo.Tasks.size() * hashShuffleTasksRatio) );
partitionsCount = std::min(partitionsCount, maxHashShuffleTasks);
} else if (auto maybeCnMap = input.Maybe<NNodes::TDqCnMap>()) {
auto cnMap = maybeCnMap.Cast();
- auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage());
- partitionsCount = originStageInfo.Tasks.size();
+ const auto& originStageInfo = graph.GetStageInfo(cnMap.Output().Stage());
+ maxHashShuffleTasks = partitionsCount = originStageInfo.Tasks.size();
}
}