diff options
author | a-romanov <[email protected]> | 2023-03-30 11:28:31 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2023-03-30 11:28:31 +0300 |
commit | 3de7ef56f40d8d7d5b6afbef0cae1732dc5784fe (patch) | |
tree | 6f9d7639f2d7d7111ae2a5cdf06c96ac6b90a1bf | |
parent | 215150b85af0221f3ccf0e8681f8adef1efe6ca9 (diff) |
YQL-15415 YQL-15435 Fix build TopSort stage in Dq.
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 49 |
1 files changed, 27 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 98c06c77754..345ab24c922 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1524,7 +1524,7 @@ bool AddSortColumn(const TExprBase& key, const TExprBase& ascending, TExprContex return true; } -TExprBase GetSortDirection(TExprBase& sortDirections, size_t index) { +TExprBase GetSortDirection(const TExprBase& sortDirections, size_t index) { TExprNode::TPtr sortDirection; if (sortDirections.Maybe<TExprList>()) { YQL_ENSURE(index < sortDirections.Cast<TExprList>().Size()); @@ -1554,7 +1554,7 @@ TExprBase DqBuildTopStage(TExprBase node, TExprContext& ctx, IOptimizationContex } if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) { - return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage))); + return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTop::idx_Input, std::move(connToPushableStage))); } const auto result = dqUnion.Output().Stage().Program().Body(); @@ -1625,25 +1625,10 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage))); } - auto result = dqUnion.Output().Stage().Program().Body(); - - auto sortKeySelector = topSort.KeySelectorLambda(); - auto sortDirections = topSort.SortDirections(); - auto lambda = Build<TCoLambda>(ctx, topSort.Pos()) - .Args({"stream"}) - .Body<TCoTopSort>() - .Input("stream") - .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref())) - .SortDirections(sortDirections) - .Count(topSort.Count()) - .Build() - .Done(); + const auto result = dqUnion.Output().Stage().Program().Body(); - auto stage = dqUnion.Output().Stage().Cast<TDqStage>(); - auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx); - if (!newStage) { - return node; - } + const auto sortKeySelector = topSort.KeySelectorLambda(); + const auto sortDirections = topSort.SortDirections(); bool canMerge = true; auto sortColumnList = Build<TDqSortColumnList>(ctx, node.Pos()); @@ -1667,8 +1652,27 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo canMerge = AddSortColumn(lambdaBody, sortDirections, ctx, node, sortKeySelector, sortColumnList, sortKeyTypes); } + canMerge = canMerge && IsMergeConnectionApplicable(sortKeyTypes); + + auto lambda = Build<TCoLambda>(ctx, topSort.Pos()) + .Args({"stream"}) + .Body<TCoTopBase>() + .CallableName(canMerge ? TCoTopSort::CallableName() : TCoTop::CallableName()) + .Input("stream") + .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref())) + .SortDirections(sortDirections) + .Count(topSort.Count()) + .Build() + .Done(); + + auto stage = dqUnion.Output().Stage().Cast<TDqStage>(); + auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx); + if (!newStage) { + return node; + } + TMaybeNode<TDqStage> outerStage; - if (canMerge && IsMergeConnectionApplicable(sortKeyTypes)) { + if (canMerge) { auto mergeCn = Build<TDqCnMerge>(ctx, node.Pos()) .Output() .Stage(newStage.Cast()) @@ -1716,10 +1720,11 @@ TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationCo .Done(); } + // TODO: Use CnMerge or AssumeSorted for keep constraints. return Build<TDqCnUnionAll>(ctx, node.Pos()) .Output() .Stage(outerStage.Cast()) - .Index().Build("0") + .Index().Build(0U) .Build() .Done(); } |