diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-04-06 18:48:44 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-04-06 18:48:44 +0300 |
commit | b9feef3f0e8a03df3e086d9a86f870d6a4740acb (patch) | |
tree | f2e08fbb8fe6c7dc4fe8b0435eb38cb63519900d | |
parent | 40b71ca5f57840f038abe9fd4f0615f92b8dece8 (diff) | |
download | ydb-b9feef3f0e8a03df3e086d9a86f870d6a4740acb.tar.gz |
Switch back to legacy Length phy rule for YQ. (KIKIMR-14596)
ref:c99ee700204c177649e95b3167db1f8221921142
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 114 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 8 |
3 files changed, 120 insertions, 5 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index bbe93cc6edc..9df0e144c5e 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1340,6 +1340,120 @@ TExprBase DqRewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptim return precompute; } +// TODO: Remove once precomputes are supported in YQ +TExprBase DqRewriteLengthOfStageOutputLegacy(TExprBase node, TExprContext& ctx, IOptimizationContext&) { + if (!node.Maybe<TCoLength>().List().Maybe<TDqCnUnionAll>()) { + return node; + } + + auto dqUnion = node.Cast<TCoLength>().List().Cast<TDqCnUnionAll>(); + + auto zero = Build<TCoUint64>(ctx, node.Pos()) + .Literal().Build("0") + .Done(); + + auto field = BuildAtom("_dq_agg_cnt", node.Pos(), ctx); + + auto combine = Build<TCoCombineByKey>(ctx, node.Pos()) + .Input(dqUnion) + .PreMapLambda() + .Args({"item"}) + .Body<TCoJust>() + .Input("item") + .Build() + .Build() + .KeySelectorLambda() + .Args({"item"}) + .Body(zero) + .Build() + .InitHandlerLambda() + .Args({"key", "item"}) + .Body<TCoUint64>() + .Literal().Build("1") + .Build() + .Build() + .UpdateHandlerLambda() + .Args({"key", "item", "state"}) + .Body<TCoInc>() + .Value("state") + .Build() + .Build() + .FinishHandlerLambda() + .Args({"key", "state"}) + .Body<TCoJust>() + .Input<TCoAsStruct>() + .Add<TCoNameValueTuple>() + .Name(field) + .Value("state") + .Build() + .Build() + .Build() + .Build() + .Done(); + + const auto stub = MakeBool<false>(node.Pos(), ctx); + + auto partition = Build<TCoPartitionsByKeys>(ctx, node.Pos()) + .Input(combine) + .KeySelectorLambda() + .Args({"item"}) + .Body(stub) + .Build() + .SortDirections<TCoVoid>() + .Build() + .SortKeySelectorLambda<TCoVoid>() + .Build() + .ListHandlerLambda() + .Args({"list"}) + .Body<TCoCondense1>() + .Input("list") + .InitHandler(BuildIdentityLambda(node.Pos(), ctx)) // take struct from CombineByKey result + .SwitchHandler() + .Args({"item", "state"}) + .Body(stub) + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body<TCoAsStruct>() + .Add<TCoNameValueTuple>() + .Name(field) + .Value<TCoAggrAdd>() + .Left<TCoMember>() + .Struct("state") + .Name(field) + .Build() + .Right<TCoMember>() + .Struct("item") + .Name(field) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Done(); + + auto toOptional = Build<TCoToOptional>(ctx, node.Pos()) + .List(partition) + .Done(); + + auto coalesce = Build<TCoCoalesce>(ctx, node.Pos()) + .Predicate(toOptional) + .Value<TCoAsStruct>() + .Add<TCoNameValueTuple>() + .Name(field) + .Value(zero) + .Build() + .Build() + .Done(); + + return Build<TCoMember>(ctx, node.Pos()) + .Struct(coalesce) + .Name(field) + .Done(); +} + TExprBase DqBuildPureExprStage(TExprBase node, TExprContext& ctx) { if (!IsDqPureExpr(node)) { return node; diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 4bf3a994839..7013aa72a47 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -63,6 +63,9 @@ NNodes::TExprBase DqBuildTakeSkipStage(NNodes::TExprBase node, TExprContext& ctx NNodes::TExprBase DqRewriteLengthOfStageOutput(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage); +NNodes::TExprBase DqRewriteLengthOfStageOutputLegacy(NNodes::TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx); + NNodes::TExprBase DqRewriteRightJoinToLeft(const NNodes::TExprBase node, TExprContext& ctx); NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprContext& ctx, diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 552f5173ea9..f574c17433f 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -33,7 +33,7 @@ public: AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>)); AddHandler(0, &TCoSort::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<false>)); - AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput<false>)); + AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput)); AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage)); AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>)); @@ -55,7 +55,6 @@ public: AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>)); AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>)); - AddHandler(1, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput<true>)); AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); @@ -273,9 +272,8 @@ protected: } } - template <bool IsGlobal> - TMaybeNode<TExprBase> RewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - return DqRewriteLengthOfStageOutput(node, ctx, optCtx, *getParents(), IsGlobal); + TMaybeNode<TExprBase> RewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { + return DqRewriteLengthOfStageOutputLegacy(node, ctx, optCtx); } TMaybeNode<TExprBase> BuildExtendStage(TExprBase node, TExprContext& ctx) { |