aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-03-29 16:13:43 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-03-29 16:13:43 +0300
commit324b6378bef84fc409b596efabcb91320b003076 (patch)
treed388238452fd984a369cb119960e6572e517f799
parent56370bd2ace043e8d98f1d54d01be86247e8f0b4 (diff)
downloadydb-324b6378bef84fc409b596efabcb91320b003076.tar.gz
YQL-15415 YQL-15435 Support Top in Dq/Kqp.
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp11
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp2
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp32
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp21
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp11
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json1
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp68
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h3
8 files changed, 123 insertions, 26 deletions
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 7e6f024b900..1cde106677f 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -923,6 +923,8 @@ private:
operatorId = Visit(maybeCombiner.Cast(), planNode);
} else if (auto maybeSort = TMaybeNode<TCoSort>(node)) {
operatorId = Visit(maybeSort.Cast(), planNode);
+ } else if (auto maybeTop = TMaybeNode<TCoTop>(node)) {
+ operatorId = Visit(maybeTop.Cast(), planNode);
} else if (auto maybeTopSort = TMaybeNode<TCoTopSort>(node)) {
operatorId = Visit(maybeTopSort.Cast(), planNode);
} else if (auto maybeTake = TMaybeNode<TCoTake>(node)) {
@@ -991,6 +993,15 @@ private:
return AddOperator(planNode, "Sort", std::move(op));
}
+ ui32 Visit(const TCoTop& top, TQueryPlanNode& planNode) {
+ TOperator op;
+ op.Properties["Name"] = "Top";
+ op.Properties["TopBy"] = PrettyExprStr(top.KeySelectorLambda());
+ op.Properties["Limit"] = PrettyExprStr(top.Count());
+
+ return AddOperator(planNode, "Top", std::move(op));
+ }
+
ui32 Visit(const TCoTopSort& topSort, TQueryPlanNode& planNode) {
TOperator op;
op.Properties["Name"] = "TopSort";
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 22af8b64c05..27ba5201f78 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -35,6 +35,7 @@ public:
AddHandler(0, &TDqJoin::Match, HNDL(JoinToIndexLookup));
AddHandler(0, &TCoCalcOverWindowBase::Match, HNDL(ExpandWindowFunctions));
AddHandler(0, &TCoCalcOverWindowGroup::Match, HNDL(ExpandWindowFunctions));
+ AddHandler(0, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
AddHandler(0, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead));
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeOverIndexRead));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(RewriteFlatMapOverExtend));
@@ -45,6 +46,7 @@ public:
AddHandler(0, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<false>));
AddHandler(0, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<false>));
AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<false>));
+ AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend));
AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend));
AddHandler(1, &TCoFlatMap::Match, HNDL(PushExtractedPredicateToReadTable));
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
index f08f1be6c3b..08acae7f07c 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
@@ -94,7 +94,7 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc
return true;
}
-bool CanPushTopSort(const TCoTopSort& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) {
+bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) {
return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns);
}
@@ -292,13 +292,13 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
// through TKqlLookupTable.
// The simplest way is to match TopSort or Take over TKqlReadTableIndex.
TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
- if (!node.Maybe<TCoTopSort>()) {
+ if (!node.Maybe<TCoTopBase>()) {
return node;
}
- auto topSort = node.Maybe<TCoTopSort>().Cast();
+ const auto topBase = node.Maybe<TCoTopBase>().Cast();
- if (auto maybeReadTableIndex = topSort.Input().Maybe<TKqlReadTableIndex>()) {
+ if (auto maybeReadTableIndex = topBase.Input().Maybe<TKqlReadTableIndex>()) {
auto readTableIndex = maybeReadTableIndex.Cast();
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, readTableIndex.Table().Path());
@@ -307,28 +307,30 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct
TVector<TString> sortByColumns;
- if (!CanPushTopSort(topSort, indexDesc, &sortByColumns)) {
+ if (!CanPushTopSort(topBase, indexDesc, &sortByColumns)) {
return node;
}
- auto filter = [&ctx, &node, &topSort](const TExprBase& in) mutable {
- auto newTopSort = Build<TCoTopSort>(ctx, node.Pos())
+ auto filter = [&ctx, &node, &topBase](const TExprBase& in) mutable {
+ auto newTop = Build<TCoTopBase>(ctx, node.Pos())
+ .CallableName(node.Ref().Content())
.Input(in)
- .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref()))
- .SortDirections(topSort.SortDirections())
- .Count(topSort.Count())
+ .KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref()))
+ .SortDirections(topBase.SortDirections())
+ .Count(topBase.Count())
.Done();
- return TExprBase(newTopSort);
+ return TExprBase(newTop);
};
auto lookup = DoRewriteIndexRead(readTableIndex, ctx, tableDesc, indexMeta,
kqpCtx.IsScanQuery(), sortByColumns, filter);
- return Build<TCoTopSort>(ctx, node.Pos())
+ return Build<TCoTopBase>(ctx, node.Pos())
+ .CallableName(node.Ref().Content())
.Input(lookup)
- .KeySelectorLambda(ctx.DeepCopyLambda(topSort.KeySelectorLambda().Ref()))
- .SortDirections(topSort.SortDirections())
- .Count(topSort.Count())
+ .KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref()))
+ .SortDirections(topBase.SortDirections())
+ .Count(topBase.Count())
.Done();
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp
index 644242d1d7b..641bb926290 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sort.cpp
@@ -11,12 +11,12 @@ using namespace NYql::NDq;
using namespace NYql::NNodes;
TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents) {
- if (!node.Maybe<TCoTopSort>().Input().Maybe<TCoExtend>()) {
+ if (!node.Maybe<TCoTopBase>().Input().Maybe<TCoExtend>()) {
return node;
}
- auto topSort = node.Cast<TCoTopSort>();
- auto extend = topSort.Input().Cast<TCoExtend>();
+ auto topBase = node.Cast<TCoTopBase>();
+ auto extend = topBase.Input().Cast<TCoExtend>();
if (!IsSingleConsumer(extend, parents)) {
return node;
@@ -25,11 +25,12 @@ TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const
TVector<TExprBase> inputs;
inputs.reserve(extend.ArgCount());
for (const auto& arg : extend) {
- auto input = Build<TCoTopSort>(ctx, node.Pos())
+ auto input = Build<TCoTopBase>(ctx, node.Pos())
+ .CallableName(node.Ref().Content())
.Input(arg)
- .Count(topSort.Count())
- .SortDirections(topSort.SortDirections())
- .KeySelectorLambda(topSort.KeySelectorLambda())
+ .Count(topBase.Count())
+ .SortDirections(topBase.SortDirections())
+ .KeySelectorLambda(topBase.KeySelectorLambda())
.Done();
inputs.push_back(input);
@@ -41,10 +42,10 @@ TExprBase KqpTopSortOverExtend(NNodes::TExprBase node, TExprContext& ctx, const
.Input<TCoExtend>()
.Add(inputs)
.Build()
- .SortDirections(topSort.SortDirections())
- .KeySelectorLambda(topSort.KeySelectorLambda())
+ .SortDirections(topBase.SortDirections())
+ .KeySelectorLambda(topBase.KeySelectorLambda())
.Build()
- .Count(topSort.Count())
+ .Count(topBase.Count())
.Done();
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 1429e140d87..cb093f1c7dd 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -51,6 +51,7 @@ public:
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>));
AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>));
+ AddHandler(0, &TCoTop::Match, HNDL(BuildTopStage<false>));
AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>));
AddHandler(0, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<false>));
AddHandler(0, &TCoSortBase::Match, HNDL(BuildSortStage<false>));
@@ -98,6 +99,7 @@ public:
AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>));
AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>));
AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>));
+ AddHandler(1, &TCoTop::Match, HNDL(BuildTopStage<true>));
AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>));
AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<true>));
AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>));
@@ -292,6 +294,15 @@ protected:
}
template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildTopStage(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildTopStage(node, ctx, optCtx, *getParents(), IsGlobal);
+ DumpAppliedRule("BuildTopStage", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
+ template <bool IsGlobal>
TMaybeNode<TExprBase> BuildTopSortStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index a277e75a499..278af745cd9 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -1548,7 +1548,6 @@
"Name": "TCoTopBase",
"Base": "TCoInputBase",
"Match": {"Type": "CallableBase"},
- "Builder": {"Generate": "None"},
"Children": [
{"Index": 1, "Name": "Count", "Type": "TExprBase"},
{"Index": 2, "Name": "SortDirections", "Type": "TExprBase"},
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 4fdc19f8470..98c06c77754 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1536,6 +1536,74 @@ TExprBase GetSortDirection(TExprBase& sortDirections, size_t index) {
}
} // End of anonymous namespace
+TExprBase DqBuildTopStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ if (!node.Maybe<TCoTop>().Input().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ const auto top = node.Cast<TCoTop>();
+ const auto dqUnion = top.Input().Cast<TDqCnUnionAll>();
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ if (!CanPushDqExpr(top.Count(), dqUnion) || !CanPushDqExpr(top.KeySelectorLambda(), dqUnion)) {
+ return node;
+ }
+
+ if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
+ return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTopSort::idx_Input, std::move(connToPushableStage)));
+ }
+
+ const auto result = dqUnion.Output().Stage().Program().Body();
+
+ const auto sortKeySelector = top.KeySelectorLambda();
+ const auto sortDirections = top.SortDirections();
+ const auto lambda = Build<TCoLambda>(ctx, top.Pos())
+ .Args({"stream"})
+ .Body<TCoTop>()
+ .Input("stream")
+ .KeySelectorLambda(ctx.DeepCopyLambda(top.KeySelectorLambda().Ref()))
+ .SortDirections(sortDirections)
+ .Count(top.Count())
+ .Build()
+ .Done();
+
+ const auto stage = dqUnion.Output().Stage().Cast<TDqStage>();
+ const auto newStage = DqPushLambdaToStage(stage, dqUnion.Output().Index(), lambda, {}, ctx, optCtx);
+ if (!newStage) {
+ return node;
+ }
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add<TDqCnUnionAll>()
+ .Output()
+ .Stage(newStage.Cast())
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .Build()
+ .Build()
+ .Program()
+ .Args({"stream"})
+ .Body<TCoTop>()
+ .Input("stream")
+ .KeySelectorLambda(ctx.DeepCopyLambda(top.KeySelectorLambda().Ref()))
+ .SortDirections(top.SortDirections())
+ .Count(top.Count())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, top.Pos()))
+ .Build()
+ .Index().Build(0U)
+ .Build()
+ .Done();
+}
+
TExprBase DqBuildTopSortStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 06a78fc48e9..da514ea422c 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -60,6 +60,9 @@ NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprCont
NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx);
+NNodes::TExprBase DqBuildTopStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
+
NNodes::TExprBase DqBuildTopSortStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);