diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-03-29 16:13:43 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-03-29 16:13:43 +0300 |
commit | 324b6378bef84fc409b596efabcb91320b003076 (patch) | |
tree | d388238452fd984a369cb119960e6572e517f799 | |
parent | 56370bd2ace043e8d98f1d54d01be86247e8f0b4 (diff) | |
download | ydb-324b6378bef84fc409b596efabcb91320b003076.tar.gz |
YQL-15415 YQL-15435 Support Top in Dq/Kqp.
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 11 | ||||
-rw-r--r-- | ydb/library/yql/core/expr_nodes/yql_expr_nodes.json | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 68 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 3 |
8 files changed, 123 insertions, 26 deletions
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 7e6f024b900..1cde106677f 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -923,6 +923,8 @@ private: operatorId = Visit(maybeCombiner.Cast(), planNode); } else if (auto maybeSort = TMaybeNode<TCoSort>(node)) { operatorId = Visit(maybeSort.Cast(), planNode); + } else if (auto maybeTop = TMaybeNode<TCoTop>(node)) { + operatorId = Visit(maybeTop.Cast(), planNode); } else if (auto maybeTopSort = TMaybeNode<TCoTopSort>(node)) { operatorId = Visit(maybeTopSort.Cast(), planNode); } else if (auto maybeTake = TMaybeNode<TCoTake>(node)) { @@ -991,6 +993,15 @@ private: return AddOperator(planNode, "Sort", std::move(op)); } + ui32 Visit(const TCoTop& top, TQueryPlanNode& planNode) { + TOperator op; + op.Properties["Name"] = "Top"; + op.Properties["TopBy"] = PrettyExprStr(top.KeySelectorLambda()); + op.Properties["Limit"] = PrettyExprStr(top.Count()); + + return AddOperator(planNode, "Top", std::move(op)); + } + ui32 Visit(const TCoTopSort& topSort, TQueryPlanNode& planNode) { TOperator op; op.Properties["Name"] = "TopSort"; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 22af8b64c05..27ba5201f78 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -35,6 +35,7 @@ public: AddHandler(0, &TDqJoin::Match, HNDL(JoinToIndexLookup)); AddHandler(0, &TCoCalcOverWindowBase::Match, HNDL(ExpandWindowFunctions)); AddHandler(0, &TCoCalcOverWindowGroup::Match, HNDL(ExpandWindowFunctions)); + AddHandler(0, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead)); AddHandler(0, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead)); AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeOverIndexRead)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(RewriteFlatMapOverExtend)); @@ -45,6 +46,7 @@ public: AddHandler(0, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<false>)); AddHandler(0, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<false>)); AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<false>)); + AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend)); AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend)); AddHandler(1, &TCoFlatMap::Match, HNDL(PushExtractedPredicateToReadTable)); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index f08f1be6c3b..08acae7f07c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -94,7 +94,7 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc return true; } -bool CanPushTopSort(const TCoTopSort& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { +bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns); } @@ -292,13 +292,13 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, // through TKqlLookupTable. // The simplest way is to match TopSort or Take over TKqlReadTableIndex. TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - if (!node.Maybe<TCoTopSort>()) { + if (!node.Maybe<TCoTopBase>()) { return node; } - auto topSort = node.Maybe<TCoTopSort>().Cast(); + const auto topBase = node.Maybe<TCoTopBase>().Cast(); - if (auto maybeReadTableIndex = topSort.Input().Maybe<TKqlReadTableIndex>()) { + if (auto maybeReadTableIndex = topBase.Input().Maybe<TKqlReadTableIndex>()) { auto readTableIndex = maybeReadTableIndex.Cast(); const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, readTableIndex.Table().Path()); @@ -307,28 +307,30 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct TVector<TString> sortByColumns; - if (!CanPushTopSort(topSort, indexDesc, &sortByColumns)) { + if (!CanPushTopSort(topBase, indexDesc, &sortByColumns)) { return node; } - auto filter = [&ctx, &node, &topSort](const TExprBase& in) mutable { - auto newTopSort = Build<TCoTopSort>(ctx, node.Pos()) + auto filter = [&ctx, &node, &topBase](const TExprBase& in) mutable { + auto newTop = Build<TCoTopBase>(ctx, node.Pos()) + .CallableName(node.Ref().Content()) .Input(in) - .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref())) - .SortDirections(topSort.SortDirections()) - .Count(topSort.Count()) + .KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref())) + .SortDirections(topBase.SortDirections()) + .Count(topBase.Count()) .Done(); - return TExprBase(newTopSort); + return TExprBase(newTop); }; auto lookup = DoRewriteIndexRead(readTableIndex, ctx, tableDesc, indexMeta, kqpCtx.IsScanQuery(), sortByColumns, filter); - return Build<TCoTopSort>(ctx, node.Pos()) + return Build<TCoTopBase>(ctx, node.Pos()) + .CallableName(node.Ref().Content()) .Input(lookup) - .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref())) - .SortDirections(topSort.SortDirections()) - .Count(topSort.Count()) + .KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref())) + .SortDirections(topBase.SortDirections()) + .Count(topBase.Count()) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp index 644242d1d7b..641bb926290 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp @@ -11,12 +11,12 @@ using namespace NYql::NDq; using namespace NYql::NNodes; TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents) { - if (!node.Maybe<TCoTopSort>().Input().Maybe<TCoExtend>()) { + if (!node.Maybe<TCoTopBase>().Input().Maybe<TCoExtend>()) { return node; } - auto topSort = node.Cast<TCoTopSort>(); - auto extend = topSort.Input().Cast<TCoExtend>(); + auto topBase = node.Cast<TCoTopBase>(); + auto extend = topBase.Input().Cast<TCoExtend>(); if (!IsSingleConsumer(extend, parents)) { return node; @@ -25,11 +25,12 @@ TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const TVector<TExprBase> inputs; inputs.reserve(extend.ArgCount()); for (const auto& arg : extend) { - auto input = Build<TCoTopSort>(ctx, node.Pos()) + auto input = Build<TCoTopBase>(ctx, node.Pos()) + .CallableName(node.Ref().Content()) .Input(arg) - .Count(topSort.Count()) - .SortDirections(topSort.SortDirections()) - .KeySelectorLambda(topSort.KeySelectorLambda()) + .Count(topBase.Count()) + .SortDirections(topBase.SortDirections()) + .KeySelectorLambda(topBase.KeySelectorLambda()) .Done(); inputs.push_back(input); @@ -41,10 +42,10 @@ TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const .Input<TCoExtend>() .Add(inputs) .Build() - .SortDirections(topSort.SortDirections()) - .KeySelectorLambda(topSort.KeySelectorLambda()) + .SortDirections(topBase.SortDirections()) + .KeySelectorLambda(topBase.KeySelectorLambda()) .Build() - .Count(topSort.Count()) + .Count(topBase.Count()) .Done(); } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 1429e140d87..cb093f1c7dd 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -51,6 +51,7 @@ public: AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>)); AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>)); AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>)); + AddHandler(0, &TCoTop::Match, HNDL(BuildTopStage<false>)); AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>)); AddHandler(0, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<false>)); AddHandler(0, &TCoSortBase::Match, HNDL(BuildSortStage<false>)); @@ -98,6 +99,7 @@ public: AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>)); AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>)); AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>)); + AddHandler(1, &TCoTop::Match, HNDL(BuildTopStage<true>)); AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>)); AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<true>)); AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>)); @@ -292,6 +294,15 @@ protected: } template <bool IsGlobal> + TMaybeNode<TExprBase> BuildTopStage(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildTopStage(node, ctx, optCtx, *getParents(), IsGlobal); + DumpAppliedRule("BuildTopStage", node.Ptr(), output.Ptr(), ctx); + return output; + } + + template <bool IsGlobal> TMaybeNode<TExprBase> BuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index a277e75a499..278af745cd9 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1548,7 +1548,6 @@ "Name": "TCoTopBase", "Base": "TCoInputBase", "Match": {"Type": "CallableBase"}, - "Builder": {"Generate": "None"}, "Children": [ {"Index": 1, "Name": "Count", "Type": "TExprBase"}, {"Index": 2, "Name": "SortDirections", "Type": "TExprBase"}, diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 4fdc19f8470..98c06c77754 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1536,6 +1536,74 @@ TExprBase GetSortDirection(TExprBase& sortDirections, size_t index) { } } // End of anonymous namespace +TExprBase DqBuildTopStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + if (!node.Maybe<TCoTop>().Input().Maybe<TDqCnUnionAll>()) { + return node; + } + + const auto top = node.Cast<TCoTop>(); + const auto dqUnion = top.Input().Cast<TDqCnUnionAll>(); + if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { + return node; + } + + if (!CanPushDqExpr(top.Count(), dqUnion) || !CanPushDqExpr(top.KeySelectorLambda(), dqUnion)) { + return node; + } + + if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) { + return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage))); + } + + const auto result = dqUnion.Output().Stage().Program().Body(); + + const auto sortKeySelector = top.KeySelectorLambda(); + const auto sortDirections = top.SortDirections(); + const auto lambda = Build<TCoLambda>(ctx, top.Pos()) + .Args({"stream"}) + .Body<TCoTop>() + .Input("stream") + .KeySelectorLambda(ctx.DeepCopyLambda(top.KeySelectorLambda().Ref())) + .SortDirections(sortDirections) + .Count(top.Count()) + .Build() + .Done(); + + const auto stage = dqUnion.Output().Stage().Cast<TDqStage>(); + const auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx); + if (!newStage) { + return node; + } + + return Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage<TDqStage>() + .Inputs() + .Add<TDqCnUnionAll>() + .Output() + .Stage(newStage.Cast()) + .Index(dqUnion.Output().Index()) + .Build() + .Build() + .Build() + .Program() + .Args({"stream"}) + .Body<TCoTop>() + .Input("stream") + .KeySelectorLambda(ctx.DeepCopyLambda(top.KeySelectorLambda().Ref())) + .SortDirections(top.SortDirections()) + .Count(top.Count()) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, top.Pos())) + .Build() + .Index().Build(0U) + .Build() + .Done(); +} + TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 06a78fc48e9..da514ea422c 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -60,6 +60,9 @@ NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprCont NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx); +NNodes::TExprBase DqBuildTopStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); + NNodes::TExprBase DqBuildTopSortStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); |