summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2023-03-30 11:28:31 +0300
committera-romanov <[email protected]>2023-03-30 11:28:31 +0300
commit3de7ef56f40d8d7d5b6afbef0cae1732dc5784fe (patch)
tree6f9d7639f2d7d7111ae2a5cdf06c96ac6b90a1bf
parent215150b85af0221f3ccf0e8681f8adef1efe6ca9 (diff)
YQL-15415 YQL-15435 Fix build TopSort stage in Dq.
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp49
1 files changed, 27 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 98c06c77754..345ab24c922 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1524,7 +1524,7 @@ bool AddSortColumn(const TExprBase& key, const TExprBase& ascending, TExprContex
return true;
}
-TExprBase GetSortDirection(TExprBase& sortDirections, size_t index) {
+TExprBase GetSortDirection(const TExprBase& sortDirections, size_t index) {
TExprNode::TPtr sortDirection;
if (sortDirections.Maybe<TExprList>()) {
YQL_ENSURE(index < sortDirections.Cast<TExprList>().Size());
@@ -1554,7 +1554,7 @@ TExprBase DqBuildTopStage(TExprBase node, TExprContext& ctx, IOptimizationContex
}
if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
- return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage)));
+ return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTop::idx_Input, std::move(connToPushableStage)));
}
const auto result = dqUnion.Output().Stage().Program().Body();
@@ -1625,25 +1625,10 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage)));
}
- auto result = dqUnion.Output().Stage().Program().Body();
-
- auto sortKeySelector = topSort.KeySelectorLambda();
- auto sortDirections = topSort.SortDirections();
- auto lambda = Build<TCoLambda>(ctx, topSort.Pos())
- .Args({"stream"})
- .Body<TCoTopSort>()
- .Input("stream")
- .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref()))
- .SortDirections(sortDirections)
- .Count(topSort.Count())
- .Build()
- .Done();
+ const auto result = dqUnion.Output().Stage().Program().Body();
- auto stage = dqUnion.Output().Stage().Cast<TDqStage>();
- auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx);
- if (!newStage) {
- return node;
- }
+ const auto sortKeySelector = topSort.KeySelectorLambda();
+ const auto sortDirections = topSort.SortDirections();
bool canMerge = true;
auto sortColumnList = Build<TDqSortColumnList>(ctx, node.Pos());
@@ -1667,8 +1652,27 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo
canMerge = AddSortColumn(lambdaBody, sortDirections, ctx, node, sortKeySelector, sortColumnList, sortKeyTypes);
}
+ canMerge = canMerge && IsMergeConnectionApplicable(sortKeyTypes);
+
+ auto lambda = Build<TCoLambda>(ctx, topSort.Pos())
+ .Args({"stream"})
+ .Body<TCoTopBase>()
+ .CallableName(canMerge ? TCoTopSort::CallableName() : TCoTop::CallableName())
+ .Input("stream")
+ .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref()))
+ .SortDirections(sortDirections)
+ .Count(topSort.Count())
+ .Build()
+ .Done();
+
+ auto stage = dqUnion.Output().Stage().Cast<TDqStage>();
+ auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx);
+ if (!newStage) {
+ return node;
+ }
+
TMaybeNode<TDqStage> outerStage;
- if (canMerge && IsMergeConnectionApplicable(sortKeyTypes)) {
+ if (canMerge) {
auto mergeCn = Build<TDqCnMerge>(ctx, node.Pos())
.Output()
.Stage(newStage.Cast())
@@ -1716,10 +1720,11 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo
.Done();
}
+ // TODO: Use CnMerge or AssumeSorted for keep constraints.
return Build<TDqCnUnionAll>(ctx, node.Pos())
.Output()
.Stage(outerStage.Cast())
- .Index().Build("0")
+ .Index().Build(0U)
.Build()
.Done();
}