aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <s.puchin@gmail.com>2022-04-06 18:48:44 +0300
committerSergei Puchin <s.puchin@gmail.com>2022-04-06 18:48:44 +0300
commitb9feef3f0e8a03df3e086d9a86f870d6a4740acb (patch)
treef2e08fbb8fe6c7dc4fe8b0435eb38cb63519900d
parent40b71ca5f57840f038abe9fd4f0615f92b8dece8 (diff)
downloadydb-b9feef3f0e8a03df3e086d9a86f870d6a4740acb.tar.gz
Switch back to legacy Length phy rule for YQ. (KIKIMR-14596)
ref:c99ee700204c177649e95b3167db1f8221921142
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp114
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h3
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp8
3 files changed, 120 insertions, 5 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index bbe93cc6edc..9df0e144c5e 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1340,6 +1340,120 @@ TExprBase DqRewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptim
return precompute;
}
+// TODO: Remove once precomputes are supported in YQ
+TExprBase DqRewriteLengthOfStageOutputLegacy(TExprBase node, TExprContext& ctx, IOptimizationContext&) {
+ if (!node.Maybe<TCoLength>().List().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ auto dqUnion = node.Cast<TCoLength>().List().Cast<TDqCnUnionAll>();
+
+ auto zero = Build<TCoUint64>(ctx, node.Pos())
+ .Literal().Build("0")
+ .Done();
+
+ auto field = BuildAtom("_dq_agg_cnt", node.Pos(), ctx);
+
+ auto combine = Build<TCoCombineByKey>(ctx, node.Pos())
+ .Input(dqUnion)
+ .PreMapLambda()
+ .Args({"item"})
+ .Body<TCoJust>()
+ .Input("item")
+ .Build()
+ .Build()
+ .KeySelectorLambda()
+ .Args({"item"})
+ .Body(zero)
+ .Build()
+ .InitHandlerLambda()
+ .Args({"key", "item"})
+ .Body<TCoUint64>()
+ .Literal().Build("1")
+ .Build()
+ .Build()
+ .UpdateHandlerLambda()
+ .Args({"key", "item", "state"})
+ .Body<TCoInc>()
+ .Value("state")
+ .Build()
+ .Build()
+ .FinishHandlerLambda()
+ .Args({"key", "state"})
+ .Body<TCoJust>()
+ .Input<TCoAsStruct>()
+ .Add<TCoNameValueTuple>()
+ .Name(field)
+ .Value("state")
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ const auto stub = MakeBool<false>(node.Pos(), ctx);
+
+ auto partition = Build<TCoPartitionsByKeys>(ctx, node.Pos())
+ .Input(combine)
+ .KeySelectorLambda()
+ .Args({"item"})
+ .Body(stub)
+ .Build()
+ .SortDirections<TCoVoid>()
+ .Build()
+ .SortKeySelectorLambda<TCoVoid>()
+ .Build()
+ .ListHandlerLambda()
+ .Args({"list"})
+ .Body<TCoCondense1>()
+ .Input("list")
+ .InitHandler(BuildIdentityLambda(node.Pos(), ctx)) // take struct from CombineByKey result
+ .SwitchHandler()
+ .Args({"item", "state"})
+ .Body(stub)
+ .Build()
+ .UpdateHandler()
+ .Args({"item", "state"})
+ .Body<TCoAsStruct>()
+ .Add<TCoNameValueTuple>()
+ .Name(field)
+ .Value<TCoAggrAdd>()
+ .Left<TCoMember>()
+ .Struct("state")
+ .Name(field)
+ .Build()
+ .Right<TCoMember>()
+ .Struct("item")
+ .Name(field)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ auto toOptional = Build<TCoToOptional>(ctx, node.Pos())
+ .List(partition)
+ .Done();
+
+ auto coalesce = Build<TCoCoalesce>(ctx, node.Pos())
+ .Predicate(toOptional)
+ .Value<TCoAsStruct>()
+ .Add<TCoNameValueTuple>()
+ .Name(field)
+ .Value(zero)
+ .Build()
+ .Build()
+ .Done();
+
+ return Build<TCoMember>(ctx, node.Pos())
+ .Struct(coalesce)
+ .Name(field)
+ .Done();
+}
+
TExprBase DqBuildPureExprStage(TExprBase node, TExprContext& ctx) {
if (!IsDqPureExpr(node)) {
return node;
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 4bf3a994839..7013aa72a47 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -63,6 +63,9 @@ NNodes::TExprBase DqBuildTakeSkipStage(NNodes::TExprBase node, TExprContext& ctx
NNodes::TExprBase DqRewriteLengthOfStageOutput(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage);
+NNodes::TExprBase DqRewriteLengthOfStageOutputLegacy(NNodes::TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx);
+
NNodes::TExprBase DqRewriteRightJoinToLeft(const NNodes::TExprBase node, TExprContext& ctx);
NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprContext& ctx,
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 552f5173ea9..f574c17433f 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -33,7 +33,7 @@ public:
AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>));
AddHandler(0, &TCoSort::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<false>));
- AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput<false>));
+ AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput));
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
@@ -55,7 +55,6 @@ public:
AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>));
AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>));
- AddHandler(1, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput<true>));
AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
@@ -273,9 +272,8 @@ protected:
}
}
- template <bool IsGlobal>
- TMaybeNode<TExprBase> RewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
- return DqRewriteLengthOfStageOutput(node, ctx, optCtx, *getParents(), IsGlobal);
+ TMaybeNode<TExprBase> RewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
+ return DqRewriteLengthOfStageOutputLegacy(node, ctx, optCtx);
}
TMaybeNode<TExprBase> BuildExtendStage(TExprBase node, TExprContext& ctx) {