diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-05 02:16:20 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-05 02:16:20 +0300 |
commit | bc6c67e2881b48c46bc7ef1dedf6ae2db06bccba (patch) | |
tree | c6ca008539941b38282a23996b609e217c17c4b8 | |
parent | 478f068cf2e44bce08b28c1c1f2e5f8eb67ad2e8 (diff) | |
download | ydb-bc6c67e2881b48c46bc7ef1dedf6ae2db06bccba.tar.gz |
remove dangerous bulk-replaces
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 44 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 34 |
5 files changed, 106 insertions, 49 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 7a9d8cd8f7..2c898b3a4a 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -97,51 +97,29 @@ NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TE } } - TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")}; - NYql::TNodeOnNodeOwnedMap bodyReplaces; - - bodyReplaces[sourceArg.Raw()] = - Build<TCoExtractMembers>(ctx, node.Pos()) - .Members(readRangesSource.Columns()) - .Input<TCoSkipNullMembers>() - .Input(replaceArg) - .Members().Add(skipNullColumns).Build() - .Build() - .Done().Ptr(); - - NYql::TNodeOnNodeOwnedMap inputsReplaces; settings.SkipNullKeys.clear(); - - auto newSource = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) + auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) .Table(readRangesSource.Table()) .Columns().Add(columns).Build() .Settings(settings.BuildNode(ctx, source.Settings().Pos())) .RangesExpr(readRangesSource.RangesExpr()) .ExplainPrompt(readRangesSource.ExplainPrompt()) .Done(); - inputsReplaces[readRangesSource.Raw()] = newSource.Ptr(); - - TVector<TCoArgument> args; - for (auto arg : stage.Program().Args()) { - if (arg.Raw() == sourceArg.Raw()) { - args.push_back(replaceArg); - } else { - args.push_back(arg); - } - } + TDqStage replacedSettings = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx); - return Build<TDqStage>(ctx, node.Pos()) - .Settings(stage.Settings()) - .Inputs(TExprList(ctx.ReplaceNodes(stage.Inputs().Ptr(), inputsReplaces))) - .Outputs(stage.Outputs()) - .Program<TCoLambda>() - .Args(args) - .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces)) + TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")}; + auto replaceExpr = + Build<TCoExtractMembers>(ctx, node.Pos()) + .Members(readRangesSource.Columns()) + .Input<TCoSkipNullMembers>() + .Input(replaceArg) + .Members().Add(skipNullColumns).Build() .Build() .Done(); + + return ReplaceStageArg(replacedSettings, *tableSourceIndex, replaceArg, replaceExpr, ctx); } -//FIXME: simplify KIKIMR-16987 TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TKqlReadTable>()) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp index a29f7df2bc..61eafa6e48 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp @@ -103,4 +103,68 @@ bool AllowFuseJoinInputs(TExprBase node) { return true; } +NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex, + NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx) +{ + auto sourceArg = stage.Program().Args().Arg(inputIndex); + + size_t index = 0; + TVector<TCoArgument> args; + NYql::TNodeOnNodeOwnedMap bodyReplaces; + for (auto arg : stage.Program().Args()) { + if (arg.Raw() == sourceArg.Raw()) { + args.push_back(replaceArg); + } else { + TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_" << index)}; + args.push_back(replaceArg); + bodyReplaces[arg.Raw()] = replaceArg.Ptr(); + } + index += 1; + } + + bodyReplaces[sourceArg.Raw()] = bodyExpression.Ptr(); + + return Build<TDqStage>(ctx, stage.Pos()) + .Settings(stage.Settings()) + .Inputs(stage.Inputs()) + .Outputs(stage.Outputs()) + .Program<TCoLambda>() + .Args(args) + .Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces))) + .Build() + .Done(); +} + +NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex, + NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx) +{ + auto source = stage.Inputs().Item(inputIndex).Cast<TDqSource>(); + auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); + auto sourceArg = stage.Program().Args().Arg(inputIndex); + + TVector<NYql::NNodes::TExprBase> inputs; + size_t index = 0; + for (auto input : stage.Inputs()) { + if (index == inputIndex) { + inputs.push_back( + Build<TDqSource>(ctx, input.Pos()) + .DataSource<TCoDataSource>() + .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() + .Build() + .Settings(settings) + .Done()); + } else { + inputs.push_back(input); + } + index += 1; + } + + return Build<TDqStage>(ctx, stage.Pos()) + .Settings(stage.Settings()) + .Inputs().Add(inputs).Build() + .Outputs(stage.Outputs()) + .Program(stage.Program()) + .Done(); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h index fbab107c0d..87fe737722 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h @@ -22,6 +22,12 @@ NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet<TStringBuf>& columns, NYql::NNodes::TCoAtomList BuildColumnsList(const TVector<TString>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); +NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex, + NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx); + +NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex, + NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx); + } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index c3579fdbbb..f4f5c06bfc 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -38,7 +38,6 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons return node; // already set? } - NYql::TNodeOnNodeOwnedMap replaces; auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); TExprNode::TPtr foundTake; bool singleConsumer = true; @@ -105,6 +104,7 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons .Input(limitValue.Cast()) .Done().Ptr()); } + auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) .Table(readRangesSource.Table()) .Columns(readRangesSource.Columns()) @@ -112,9 +112,8 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons .RangesExpr(readRangesSource.RangesExpr()) .ExplainPrompt(readRangesSource.ExplainPrompt()) .Done(); - replaces[readRangesSource.Raw()] = newSettings.Ptr(); - - return TExprBase(ctx.ReplaceNodes(node.Ptr(), replaces)); + + return ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx); } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index 71b1d565cc..58f7e35062 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -204,8 +204,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path()); TVector<NYql::TKqpReadTableSettings> newSettings; - NYql::TNodeOnNodeOwnedMap replaces; - auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); + NYql::TNodeOnNodeOwnedMap bodyReplaces; VisitExpr(stage.Program().Body().Ptr(), [&](const TExprNode::TPtr& exprPtr) -> bool { TExprBase expr(exprPtr); @@ -225,7 +224,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( return input; }); if (newExpr.Ptr() != expr.Ptr()) { - replaces[expr.Raw()] = newExpr.Ptr(); + bodyReplaces[expr.Raw()] = newExpr.Ptr(); } } return true; @@ -238,17 +237,28 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( } } - auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos()) - .Table(readRangesSource.Table()) - .Columns(readRangesSource.Columns()) - .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos())) - .RangesExpr(readRangesSource.RangesExpr()) - .ExplainPrompt(readRangesSource.ExplainPrompt()) - .Done(); - replaces[readRangesSource.Raw()] = newSource.Ptr(); + if (settings != newSettings[0]) { + auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos()) + .Table(readRangesSource.Table()) + .Columns(readRangesSource.Columns()) + .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos())) + .RangesExpr(readRangesSource.RangesExpr()) + .ExplainPrompt(readRangesSource.ExplainPrompt()) + .Done(); + stage = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSource, exprCtx); + } + } + + if (bodyReplaces.empty()) { + return stage; } - return TExprBase(exprCtx.ReplaceNodes(node.Ptr(), replaces)); + return Build<TDqStage>(exprCtx, stage.Pos()) + .Inputs(stage.Inputs()) + .Outputs(stage.Outputs()) + .Settings(stage.Settings()) + .Program(TCoLambda(exprCtx.ReplaceNodes(stage.Program().Ptr(), bodyReplaces))) + .Done(); } } // namespace NKikimr::NKqp::NOpt |