aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-02 20:43:08 +0300
committerudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-02 20:43:08 +0300
commit1eb3b7819d2cc47d791f522a87d1986c7b715d38 (patch)
tree533cb6e7fe79ccbaabc8b7f7fc7179f7ae9e9718
parentd48705910726563d711e5ed8ccff62a8a2cda516 (diff)
downloadydb-1eb3b7819d2cc47d791f522a87d1986c7b715d38.tar.gz
[yql] Move some DqPrecompute optimizers
YQL-12393 ref:e2fdecf7892f2b9c7163c0b3976622dbc87e1b67
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp11
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp84
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h6
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp172
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h8
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp61
6 files changed, 171 insertions, 171 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index fb4814170e6..27046fb0c7c 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -58,6 +58,7 @@ public:
AddHandler(0, &TKqlInsertRowsIndex::Match, HNDL(BuildInsertIndexStages));
AddHandler(0, &TKqlDeleteRowsIndex::Match, HNDL(BuildDeleteIndexStages));
AddHandler(0, &TCoUnorderedBase::Match, HNDL(DropUnordered));
+ AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput));
AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage));
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>));
@@ -351,7 +352,7 @@ protected:
TMaybeNode<TExprBase> PropagatePrecomuteTake(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
- TExprBase output = KqpPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal);
+ TExprBase output = DqPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("PropagatePrecomuteTake", node.Ptr(), output.Ptr(), ctx);
return output;
}
@@ -360,11 +361,17 @@ protected:
TMaybeNode<TExprBase> PropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
- TExprBase output = KqpPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal);
+ TExprBase output = DqPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("PropagatePrecomuteFlatmap", node.Ptr(), output.Ptr(), ctx);
return output;
}
+ TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) {
+ TExprBase output = DqPrecomputeToInput(node, ctx);
+ DumpAppliedRule("PrecomputeToInput", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
private:
TTypeAnnotationContext& TypesCtx;
const TKqpOptimizeContext& KqpCtx;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
index 6be9792004f..b009486741d 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
@@ -104,89 +104,5 @@ TExprBase KqpPropagatePrecomuteScalarRowset(TExprBase node, TExprContext& ctx, I
.Done();
}
-TExprBase KqpPropagatePrecomuteTake(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
- if (!node.Maybe<TCoTake>().Input().Maybe<TDqPhyPrecompute>()) {
- return node;
- }
-
- auto take = node.Cast<TCoTake>();
- auto precompute = take.Input().Cast<TDqPhyPrecompute>();
-
- if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) {
- return node;
- }
-
- if (!CanPushDqExpr(take.Count(), precompute.Connection())) {
- return node;
- }
-
- auto takeLambda = Build<TCoLambda>(ctx, node.Pos())
- .Args({"list_stream"})
- .Body<TCoMap>()
- .Input("list_stream")
- .Lambda()
- .Args({"list"})
- .Body<TCoTake>()
- .Input("list")
- .Count(take.Count())
- .Build()
- .Build()
- .Build()
- .Done();
-
- auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), takeLambda, {}, ctx, optCtx);
- if (!result) {
- return node;
- }
-
- return Build<TDqPhyPrecompute>(ctx, node.Pos())
- .Connection(result.Cast())
- .Done();
-}
-
-TExprBase KqpPropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
- if (!node.Maybe<TCoFlatMap>().Input().Maybe<TDqPhyPrecompute>()) {
- return node;
- }
-
- auto flatmap = node.Cast<TCoFlatMap>();
- auto precompute = flatmap.Input().Cast<TDqPhyPrecompute>();
-
- if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) {
- return node;
- }
-
- if (!CanPushDqExpr(flatmap.Lambda(), precompute.Connection())) {
- return node;
- }
-
- auto flatmapLambda = Build<TCoLambda>(ctx, node.Pos())
- .Args({"list_stream"})
- .Body<TCoMap>()
- .Input("list_stream")
- .Lambda()
- .Args({"list"})
- .Body<TCoFlatMap>()
- .Input("list")
- .Lambda(flatmap.Lambda())
- .Build()
- .Build()
- .Build()
- .Done();
-
- auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), flatmapLambda, {}, ctx, optCtx);
- if (!result) {
- return node;
- }
-
- return Build<TDqPhyPrecompute>(ctx, node.Pos())
- .Connection(result.Cast())
- .Done();
-}
-
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 92207e26f8a..6ec28394c7a 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -33,10 +33,4 @@ NYql::NNodes::TExprBase KqpFloatUpStage(NYql::NNodes::TExprBase node, NYql::TExp
NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
-NYql::NNodes::TExprBase KqpPropagatePrecomuteTake(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
- NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
-
-NYql::NNodes::TExprBase KqpPropagatePrecomuteFlatmap(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
- NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
-
} // NKikimr::NKqp::NOpt
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index e93e5ba7d1d..ecfe4392a13 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -250,52 +250,20 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o
TVector<TCoArgument> newArgs = PrepareArgumentsReplacement(TCoLambda(newProgram).Args(), lambdaInputs, ctx, inputArgReplaces);
TVector<TExprBase> inputNodes;
- // if lambda contains precomputes -> move them to the stage inputs
- {
- TNodeOnNodeOwnedMap precomputesInsideLambda;
- VisitExpr(newProgram, [&precomputesInsideLambda](const TExprNode::TPtr& node) {
- if (node->IsCallable() && node->Content().StartsWith("DqRead")) {
- return false;
- }
- if (TDqPhyPrecompute::Match(node.Get())) {
- precomputesInsideLambda[node.Get()] = node;
- return false;
- }
- return true;
- });
-
- YQL_CLOG(TRACE, CoreDq) << "lambda with " << precomputesInsideLambda.size() << " precomputes and "
- << lambdaInputs.size() << " inputs";
-
- auto prevInputs = stage.Inputs();
-
- inputNodes.reserve(newArgs.size() + precomputesInsideLambda.size());
- inputNodes.insert(inputNodes.end(), prevInputs.begin(), prevInputs.end());
- inputNodes.insert(inputNodes.end(), lambdaInputs.begin(), lambdaInputs.end());
-
- for (auto [raw, ptr]: precomputesInsideLambda) {
- auto it = std::find_if(prevInputs.begin(), prevInputs.end(), [raw=raw](auto x) { return x.Raw() == raw; });
- if (it != prevInputs.end()) {
- ui64 inputIndex = std::distance(prevInputs.begin(), it);
- inputArgReplaces[raw] = newArgs[inputIndex].Ptr();
- } else {
- inputNodes.emplace_back(TExprBase(ptr));
- newArgs.emplace_back(TCoArgument(ctx.NewArgument(raw->Pos(), "precompute")));
- inputArgReplaces[raw] = newArgs.back().Ptr();
- }
- }
- }
+ inputNodes.reserve(newArgs.size());
+ inputNodes.insert(inputNodes.end(), stage.Inputs().begin(), stage.Inputs().end());
+ inputNodes.insert(inputNodes.end(), lambdaInputs.begin(), lambdaInputs.end());
YQL_ENSURE(newArgs.size() == inputNodes.size(), "" << newArgs.size() << " != " << inputNodes.size());
auto newStage = Build<TDqStage>(ctx, stage.Pos())
.Inputs()
.Add(inputNodes)
- .Build()
+ .Build()
.Program()
.Args(newArgs)
.Body(ctx.ReplaceNodes(newProgram->TailPtr(), inputArgReplaces))
- .Build()
+ .Build()
.Settings(TDqStageSettings().BuildNode(ctx, stage.Pos()))
.Done();
@@ -1757,4 +1725,134 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
}
+TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {
+ if (!node.Maybe<TDqStageBase>()) {
+ return node;
+ }
+
+ auto stage = node.Cast<TDqStageBase>();
+
+ TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(),
+ [](const TExprNode::TPtr& node) {
+ return ETypeAnnotationKind::World != node->GetTypeAnn()->GetKind() && !TDqPhyPrecompute::Match(node.Get());
+ },
+ [](const TExprNode::TPtr& node) {
+ return TDqPhyPrecompute::Match(node.Get());
+ }
+ );
+
+ if (innerPrecomputes.empty()) {
+ return node;
+ }
+
+ TExprNode::TListType newInputs;
+ TExprNode::TListType newArgs;
+ TNodeOnNodeOwnedMap replaces;
+
+ for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
+ newInputs.push_back(stage.Inputs().Item(i).Ptr());
+ auto arg = stage.Program().Args().Arg(i).Raw();
+ newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content()));
+ replaces[arg] = newArgs.back();
+ }
+
+ for (auto& precompute: innerPrecomputes) {
+ newInputs.push_back(precompute);
+ newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size()));
+ replaces[precompute.Get()] = newArgs.back();
+ }
+
+ TExprNode::TListType children = stage.Ref().ChildrenList();
+ children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs));
+ children[TDqStageBase::idx_Program] = ctx.NewLambda(stage.Program().Pos(),
+ ctx.NewArguments(stage.Program().Args().Pos(), std::move(newArgs)),
+ ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces));
+
+ return TExprBase(ctx.ChangeChildren(stage.Ref(), std::move(children)));
+}
+
+TExprBase DqPropagatePrecomuteTake(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ if (!node.Maybe<TCoTake>().Input().Maybe<TDqPhyPrecompute>()) {
+ return node;
+ }
+
+ auto take = node.Cast<TCoTake>();
+ auto precompute = take.Input().Cast<TDqPhyPrecompute>();
+
+ if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ if (!CanPushDqExpr(take.Count(), precompute.Connection())) {
+ return node;
+ }
+
+ auto takeLambda = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"list_stream"})
+ .Body<TCoMap>()
+ .Input("list_stream")
+ .Lambda()
+ .Args({"list"})
+ .Body<TCoTake>()
+ .Input("list")
+ .Count(take.Count())
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), takeLambda, {}, ctx, optCtx);
+ if (!result) {
+ return node;
+ }
+
+ return Build<TDqPhyPrecompute>(ctx, node.Pos())
+ .Connection(result.Cast())
+ .Done();
+}
+
+TExprBase DqPropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ if (!node.Maybe<TCoFlatMap>().Input().Maybe<TDqPhyPrecompute>()) {
+ return node;
+ }
+
+ auto flatmap = node.Cast<TCoFlatMap>();
+ auto precompute = flatmap.Input().Cast<TDqPhyPrecompute>();
+
+ if (!IsSingleConsumerConnection(precompute.Connection(), parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ if (!CanPushDqExpr(flatmap.Lambda(), precompute.Connection())) {
+ return node;
+ }
+
+ auto flatmapLambda = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"list_stream"})
+ .Body<TCoMap>()
+ .Input("list_stream")
+ .Lambda()
+ .Args({"list"})
+ .Body<TCoFlatMap>()
+ .Input("list")
+ .Lambda(flatmap.Lambda())
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ auto result = DqPushLambdaToStageUnionAll(precompute.Connection(), flatmapLambda, {}, ctx, optCtx);
+ if (!result) {
+ return node;
+ }
+
+ return Build<TDqPhyPrecompute>(ctx, node.Pos())
+ .Connection(result.Cast())
+ .Done();
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 175ee25c300..de152bdf1b4 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -96,4 +96,12 @@ NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExp
NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage);
+NYql::NNodes::TExprBase DqPrecomputeToInput(const NYql::NNodes::TExprBase& node, TExprContext& ctx);
+
+NYql::NNodes::TExprBase DqPropagatePrecomuteTake(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
+
+NYql::NNodes::TExprBase DqPropagatePrecomuteFlatmap(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, bool allowStageMultiUsage);
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 2a14933488a..77713e64c4c 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -50,6 +50,8 @@ public:
AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute));
AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput));
+ AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
+ AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
}
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
@@ -65,8 +67,10 @@ public:
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
if (enablePrecompute) {
- AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
- AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>));
+ AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
+ AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>));
+ AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>));
+ AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>));
}
#undef HNDL
@@ -312,48 +316,21 @@ protected:
}
TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) {
- auto stage = node.Cast<TDqStage>();
-
- TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(),
- [](const TExprNode::TPtr& node) {
- return !TDqReadWrapBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get());
- },
- [](const TExprNode::TPtr& node) {
- return TDqPhyPrecompute::Match(node.Get());
- }
- );
-
- if (innerPrecomputes.empty()) {
- return node;
- }
-
- TVector<TExprNode::TPtr> newInputs;
- TVector<TExprNode::TPtr> newArgs;
- TNodeOnNodeOwnedMap replaces;
-
- for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
- newInputs.push_back(stage.Inputs().Item(i).Ptr());
- auto arg = stage.Program().Args().Arg(i).Raw();
- newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content()));
- replaces[arg] = newArgs.back();
- }
+ return DqPrecomputeToInput(node, ctx);
+ }
- for (auto& precompute: innerPrecomputes) {
- newInputs.push_back(precompute);
- newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size()));
- replaces[precompute.Get()] = newArgs.back();
- }
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> PropagatePrecomuteTake(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ return DqPropagatePrecomuteTake(node, ctx, optCtx, *getParents(), IsGlobal);
+ }
- return Build<TDqStage>(ctx, stage.Pos())
- .Inputs()
- .Add(newInputs)
- .Build()
- .Program()
- .Args(newArgs)
- .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces))
- .Build()
- .Settings().Build()
- .Done();
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> PropagatePrecomuteFlatmap(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ return DqPropagatePrecomuteFlatmap(node, ctx, optCtx, *getParents(), IsGlobal);
}
private: