diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-02 20:43:08 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-02 20:43:08 +0300 |
commit | 1eb3b7819d2cc47d791f522a87d1986c7b715d38 (patch) | |
tree | 533cb6e7fe79ccbaabc8b7f7fc7179f7ae9e9718 | |
parent | d48705910726563d711e5ed8ccff62a8a2cda516 (diff) | |
download | ydb-1eb3b7819d2cc47d791f522a87d1986c7b715d38.tar.gz |
[yql] Move some DqPrecompute optimizers
YQL-12393
ref:e2fdecf7892f2b9c7163c0b3976622dbc87e1b67
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp | 84 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 172 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 61 |
6 files changed, 171 insertions, 171 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index fb4814170e6..27046fb0c7c 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -58,6 +58,7 @@ public: AddHandler(0, &TKqlInsertRowsIndex::Match, HNDL(BuildInsertIndexStages)); AddHandler(0, &TKqlDeleteRowsIndex::Match, HNDL(BuildDeleteIndexStages)); AddHandler(0, &TCoUnorderedBase::Match, HNDL(DropUnordered)); + AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput)); AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage)); AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>)); @@ -351,7 +352,7 @@ protected: TMaybeNode<TExprBase> PropagatePrecomuteTake(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - TExprBase output = KqpPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal); + TExprBase output = DqPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("PropagatePrecomuteTake", node.Ptr(), output.Ptr(), ctx); return output; } @@ -360,11 +361,17 @@ protected: TMaybeNode<TExprBase> PropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - TExprBase output = KqpPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal); + TExprBase output = DqPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("PropagatePrecomuteFlatmap", node.Ptr(), output.Ptr(), ctx); return output; } + TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) { + TExprBase output = DqPrecomputeToInput(node, ctx); + DumpAppliedRule("PrecomputeToInput", node.Ptr(), output.Ptr(), ctx); + return output; + } + private: TTypeAnnotationContext& TypesCtx; const TKqpOptimizeContext& KqpCtx; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp index 6be9792004f..b009486741d 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp @@ -104,89 +104,5 @@ TExprBase KqpPropagatePrecomuteScalarRowset(TExprBase node, TExprContext& ctx, I .Done(); } -TExprBase KqpPropagatePrecomuteTake(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - if (!node.Maybe<TCoTake>().Input().Maybe<TDqPhyPrecompute>()) { - return node; - } - - auto take = node.Cast<TCoTake>(); - auto precompute = take.Input().Cast<TDqPhyPrecompute>(); - - if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) { - return node; - } - - if (!CanPushDqExpr(take.Count(), precompute.Connection())) { - return node; - } - - auto takeLambda = Build<TCoLambda>(ctx, node.Pos()) - .Args({"list_stream"}) - .Body<TCoMap>() - .Input("list_stream") - .Lambda() - .Args({"list"}) - .Body<TCoTake>() - .Input("list") - .Count(take.Count()) - .Build() - .Build() - .Build() - .Done(); - - auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), takeLambda, {}, ctx, optCtx); - if (!result) { - return node; - } - - return Build<TDqPhyPrecompute>(ctx, node.Pos()) - .Connection(result.Cast()) - .Done(); -} - -TExprBase KqpPropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - if (!node.Maybe<TCoFlatMap>().Input().Maybe<TDqPhyPrecompute>()) { - return node; - } - - auto flatmap = node.Cast<TCoFlatMap>(); - auto precompute = flatmap.Input().Cast<TDqPhyPrecompute>(); - - if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) { - return node; - } - - if (!CanPushDqExpr(flatmap.Lambda(), precompute.Connection())) { - return node; - } - - auto flatmapLambda = Build<TCoLambda>(ctx, node.Pos()) - .Args({"list_stream"}) - .Body<TCoMap>() - .Input("list_stream") - .Lambda() - .Args({"list"}) - .Body<TCoFlatMap>() - .Input("list") - .Lambda(flatmap.Lambda()) - .Build() - .Build() - .Build() - .Done(); - - auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), flatmapLambda, {}, ctx, optCtx); - if (!result) { - return node; - } - - return Build<TDqPhyPrecompute>(ctx, node.Pos()) - .Connection(result.Cast()) - .Done(); -} - } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 92207e26f8a..6ec28394c7a 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -33,10 +33,4 @@ NYql::NNodes::TExprBase KqpFloatUpStage(NYql::NNodes::TExprBase node, NYql::TExp NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); -NYql::NNodes::TExprBase KqpPropagatePrecomuteTake(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); - -NYql::NNodes::TExprBase KqpPropagatePrecomuteFlatmap(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); - } // NKikimr::NKqp::NOpt diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index e93e5ba7d1d..ecfe4392a13 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -250,52 +250,20 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o TVector<TCoArgument> newArgs = PrepareArgumentsReplacement(TCoLambda(newProgram).Args(), lambdaInputs, ctx, inputArgReplaces); TVector<TExprBase> inputNodes; - // if lambda contains precomputes -> move them to the stage inputs - { - TNodeOnNodeOwnedMap precomputesInsideLambda; - VisitExpr(newProgram, [&precomputesInsideLambda](const TExprNode::TPtr& node) { - if (node->IsCallable() && node->Content().StartsWith("DqRead")) { - return false; - } - if (TDqPhyPrecompute::Match(node.Get())) { - precomputesInsideLambda[node.Get()] = node; - return false; - } - return true; - }); - - YQL_CLOG(TRACE, CoreDq) << "lambda with " << precomputesInsideLambda.size() << " precomputes and " - << lambdaInputs.size() << " inputs"; - - auto prevInputs = stage.Inputs(); - - inputNodes.reserve(newArgs.size() + precomputesInsideLambda.size()); - inputNodes.insert(inputNodes.end(), prevInputs.begin(), prevInputs.end()); - inputNodes.insert(inputNodes.end(), lambdaInputs.begin(), lambdaInputs.end()); - - for (auto [raw, ptr]: precomputesInsideLambda) { - auto it = std::find_if(prevInputs.begin(), prevInputs.end(), [raw=raw](auto x) { return x.Raw() == raw; }); - if (it != prevInputs.end()) { - ui64 inputIndex = std::distance(prevInputs.begin(), it); - inputArgReplaces[raw] = newArgs[inputIndex].Ptr(); - } else { - inputNodes.emplace_back(TExprBase(ptr)); - newArgs.emplace_back(TCoArgument(ctx.NewArgument(raw->Pos(), "precompute"))); - inputArgReplaces[raw] = newArgs.back().Ptr(); - } - } - } + inputNodes.reserve(newArgs.size()); + inputNodes.insert(inputNodes.end(), stage.Inputs().begin(), stage.Inputs().end()); + inputNodes.insert(inputNodes.end(), lambdaInputs.begin(), lambdaInputs.end()); YQL_ENSURE(newArgs.size() == inputNodes.size(), "" << newArgs.size() << " != " << inputNodes.size()); auto newStage = Build<TDqStage>(ctx, stage.Pos()) .Inputs() .Add(inputNodes) - .Build() + .Build() .Program() .Args(newArgs) .Body(ctx.ReplaceNodes(newProgram->TailPtr(), inputArgReplaces)) - .Build() + .Build() .Settings(TDqStageSettings().BuildNode(ctx, stage.Pos())) .Done(); @@ -1757,4 +1725,134 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx); } +TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) { + if (!node.Maybe<TDqStageBase>()) { + return node; + } + + auto stage = node.Cast<TDqStageBase>(); + + TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(), + [](const TExprNode::TPtr& node) { + return ETypeAnnotationKind::World != node->GetTypeAnn()->GetKind() && !TDqPhyPrecompute::Match(node.Get()); + }, + [](const TExprNode::TPtr& node) { + return TDqPhyPrecompute::Match(node.Get()); + } + ); + + if (innerPrecomputes.empty()) { + return node; + } + + TExprNode::TListType newInputs; + TExprNode::TListType newArgs; + TNodeOnNodeOwnedMap replaces; + + for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { + newInputs.push_back(stage.Inputs().Item(i).Ptr()); + auto arg = stage.Program().Args().Arg(i).Raw(); + newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content())); + replaces[arg] = newArgs.back(); + } + + for (auto& precompute: innerPrecomputes) { + newInputs.push_back(precompute); + newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size())); + replaces[precompute.Get()] = newArgs.back(); + } + + TExprNode::TListType children = stage.Ref().ChildrenList(); + children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs)); + children[TDqStageBase::idx_Program] = ctx.NewLambda(stage.Program().Pos(), + ctx.NewArguments(stage.Program().Args().Pos(), std::move(newArgs)), + ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces)); + + return TExprBase(ctx.ChangeChildren(stage.Ref(), std::move(children))); +} + +TExprBase DqPropagatePrecomuteTake(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + if (!node.Maybe<TCoTake>().Input().Maybe<TDqPhyPrecompute>()) { + return node; + } + + auto take = node.Cast<TCoTake>(); + auto precompute = take.Input().Cast<TDqPhyPrecompute>(); + + if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) { + return node; + } + + if (!CanPushDqExpr(take.Count(), precompute.Connection())) { + return node; + } + + auto takeLambda = Build<TCoLambda>(ctx, node.Pos()) + .Args({"list_stream"}) + .Body<TCoMap>() + .Input("list_stream") + .Lambda() + .Args({"list"}) + .Body<TCoTake>() + .Input("list") + .Count(take.Count()) + .Build() + .Build() + .Build() + .Done(); + + auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), takeLambda, {}, ctx, optCtx); + if (!result) { + return node; + } + + return Build<TDqPhyPrecompute>(ctx, node.Pos()) + .Connection(result.Cast()) + .Done(); +} + +TExprBase DqPropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + if (!node.Maybe<TCoFlatMap>().Input().Maybe<TDqPhyPrecompute>()) { + return node; + } + + auto flatmap = node.Cast<TCoFlatMap>(); + auto precompute = flatmap.Input().Cast<TDqPhyPrecompute>(); + + if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) { + return node; + } + + if (!CanPushDqExpr(flatmap.Lambda(), precompute.Connection())) { + return node; + } + + auto flatmapLambda = Build<TCoLambda>(ctx, node.Pos()) + .Args({"list_stream"}) + .Body<TCoMap>() + .Input("list_stream") + .Lambda() + .Args({"list"}) + .Body<TCoFlatMap>() + .Input("list") + .Lambda(flatmap.Lambda()) + .Build() + .Build() + .Build() + .Done(); + + auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), flatmapLambda, {}, ctx, optCtx); + if (!result) { + return node; + } + + return Build<TDqPhyPrecompute>(ctx, node.Pos()) + .Connection(result.Cast()) + .Done(); +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 175ee25c300..de152bdf1b4 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -96,4 +96,12 @@ NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExp NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage); +NYql::NNodes::TExprBase DqPrecomputeToInput(const NYql::NNodes::TExprBase& node, TExprContext& ctx); + +NYql::NNodes::TExprBase DqPropagatePrecomuteTake(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); + +NYql::NNodes::TExprBase DqPropagatePrecomuteFlatmap(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage); + } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 2a14933488a..77713e64c4c 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -50,6 +50,8 @@ public: AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute)); AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput)); + AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>)); + AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); } AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>)); @@ -65,8 +67,10 @@ public: AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); if (enablePrecompute) { - AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); - AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>)); + AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); + AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>)); + AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>)); + AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>)); } #undef HNDL @@ -312,48 +316,21 @@ protected: } TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) { - auto stage = node.Cast<TDqStage>(); - - TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(), - [](const TExprNode::TPtr& node) { - return !TDqReadWrapBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get()); - }, - [](const TExprNode::TPtr& node) { - return TDqPhyPrecompute::Match(node.Get()); - } - ); - - if (innerPrecomputes.empty()) { - return node; - } - - TVector<TExprNode::TPtr> newInputs; - TVector<TExprNode::TPtr> newArgs; - TNodeOnNodeOwnedMap replaces; - - for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { - newInputs.push_back(stage.Inputs().Item(i).Ptr()); - auto arg = stage.Program().Args().Arg(i).Raw(); - newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content())); - replaces[arg] = newArgs.back(); - } + return DqPrecomputeToInput(node, ctx); + } - for (auto& precompute: innerPrecomputes) { - newInputs.push_back(precompute); - newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size())); - replaces[precompute.Get()] = newArgs.back(); - } + template <bool IsGlobal> + TMaybeNode<TExprBase> PropagatePrecomuteTake(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + return DqPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal); + } - return Build<TDqStage>(ctx, stage.Pos()) - .Inputs() - .Add(newInputs) - .Build() - .Program() - .Args(newArgs) - .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces)) - .Build() - .Settings().Build() - .Done(); + template <bool IsGlobal> + TMaybeNode<TExprBase> PropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + return DqPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal); } private: |