aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-05 02:16:20 +0300
committerssmike <ssmike@ydb.tech>2023-02-05 02:16:20 +0300
commitbc6c67e2881b48c46bc7ef1dedf6ae2db06bccba (patch)
treec6ca008539941b38282a23996b609e217c17c4b8
parent478f068cf2e44bce08b28c1c1f2e5f8eb67ad2e8 (diff)
downloadydb-bc6c67e2881b48c46bc7ef1dedf6ae2db06bccba.tar.gz
remove dangerous bulk-replaces
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp44
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp64
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h6
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp34
5 files changed, 106 insertions, 49 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 7a9d8cd8f7..2c898b3a4a 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -97,51 +97,29 @@ NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TE
}
}
- TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
- NYql::TNodeOnNodeOwnedMap bodyReplaces;
-
- bodyReplaces[sourceArg.Raw()] =
- Build<TCoExtractMembers>(ctx, node.Pos())
- .Members(readRangesSource.Columns())
- .Input<TCoSkipNullMembers>()
- .Input(replaceArg)
- .Members().Add(skipNullColumns).Build()
- .Build()
- .Done().Ptr();
-
- NYql::TNodeOnNodeOwnedMap inputsReplaces;
settings.SkipNullKeys.clear();
-
- auto newSource = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
+ auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
.Table(readRangesSource.Table())
.Columns().Add(columns).Build()
.Settings(settings.BuildNode(ctx, source.Settings().Pos()))
.RangesExpr(readRangesSource.RangesExpr())
.ExplainPrompt(readRangesSource.ExplainPrompt())
.Done();
- inputsReplaces[readRangesSource.Raw()] = newSource.Ptr();
-
- TVector<TCoArgument> args;
- for (auto arg : stage.Program().Args()) {
- if (arg.Raw() == sourceArg.Raw()) {
- args.push_back(replaceArg);
- } else {
- args.push_back(arg);
- }
- }
+ TDqStage replacedSettings = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
- return Build<TDqStage>(ctx, node.Pos())
- .Settings(stage.Settings())
- .Inputs(TExprList(ctx.ReplaceNodes(stage.Inputs().Ptr(), inputsReplaces)))
- .Outputs(stage.Outputs())
- .Program<TCoLambda>()
- .Args(args)
- .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces))
+ TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
+ auto replaceExpr =
+ Build<TCoExtractMembers>(ctx, node.Pos())
+ .Members(readRangesSource.Columns())
+ .Input<TCoSkipNullMembers>()
+ .Input(replaceArg)
+ .Members().Add(skipNullColumns).Build()
.Build()
.Done();
+
+ return ReplaceStageArg(replacedSettings, *tableSourceIndex, replaceArg, replaceExpr, ctx);
}
-//FIXME: simplify KIKIMR-16987
TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TKqlReadTable>()) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
index a29f7df2bc..61eafa6e48 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
@@ -103,4 +103,68 @@ bool AllowFuseJoinInputs(TExprBase node) {
return true;
}
+NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex,
+ NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx)
+{
+ auto sourceArg = stage.Program().Args().Arg(inputIndex);
+
+ size_t index = 0;
+ TVector<TCoArgument> args;
+ NYql::TNodeOnNodeOwnedMap bodyReplaces;
+ for (auto arg : stage.Program().Args()) {
+ if (arg.Raw() == sourceArg.Raw()) {
+ args.push_back(replaceArg);
+ } else {
+ TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_" << index)};
+ args.push_back(replaceArg);
+ bodyReplaces[arg.Raw()] = replaceArg.Ptr();
+ }
+ index += 1;
+ }
+
+ bodyReplaces[sourceArg.Raw()] = bodyExpression.Ptr();
+
+ return Build<TDqStage>(ctx, stage.Pos())
+ .Settings(stage.Settings())
+ .Inputs(stage.Inputs())
+ .Outputs(stage.Outputs())
+ .Program<TCoLambda>()
+ .Args(args)
+ .Body(TExprBase(ctx.ReplaceNodes(stage.Program().Body().Ptr(), bodyReplaces)))
+ .Build()
+ .Done();
+}
+
+NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex,
+ NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx)
+{
+ auto source = stage.Inputs().Item(inputIndex).Cast<TDqSource>();
+ auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
+ auto sourceArg = stage.Program().Args().Arg(inputIndex);
+
+ TVector<NYql::NNodes::TExprBase> inputs;
+ size_t index = 0;
+ for (auto input : stage.Inputs()) {
+ if (index == inputIndex) {
+ inputs.push_back(
+ Build<TDqSource>(ctx, input.Pos())
+ .DataSource<TCoDataSource>()
+ .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
+ .Build()
+ .Settings(settings)
+ .Done());
+ } else {
+ inputs.push_back(input);
+ }
+ index += 1;
+ }
+
+ return Build<TDqStage>(ctx, stage.Pos())
+ .Settings(stage.Settings())
+ .Inputs().Add(inputs).Build()
+ .Outputs(stage.Outputs())
+ .Program(stage.Program())
+ .Done();
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h
index fbab107c0d..87fe737722 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h
@@ -22,6 +22,12 @@ NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet<TStringBuf>& columns,
NYql::NNodes::TCoAtomList BuildColumnsList(const TVector<TString>& columns, NYql::TPositionHandle pos,
NYql::TExprContext& ctx);
+NYql::NNodes::TDqStage ReplaceStageArg(NYql::NNodes::TDqStage stage, size_t inputIndex,
+ NYql::NNodes::TCoArgument replaceArg, NYql::NNodes::TExprBase bodyExpression, NYql::TExprContext& ctx);
+
+NYql::NNodes::TDqStage ReplaceTableSourceSettings(NYql::NNodes::TDqStage stage, size_t inputIndex,
+ NYql::NNodes::TKqpReadRangesSourceSettings settings, NYql::TExprContext& ctx);
+
} // NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index c3579fdbbb..f4f5c06bfc 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -38,7 +38,6 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
return node; // already set?
}
- NYql::TNodeOnNodeOwnedMap replaces;
auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
TExprNode::TPtr foundTake;
bool singleConsumer = true;
@@ -105,6 +104,7 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
.Input(limitValue.Cast())
.Done().Ptr());
}
+
auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
.Table(readRangesSource.Table())
.Columns(readRangesSource.Columns())
@@ -112,9 +112,8 @@ TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, cons
.RangesExpr(readRangesSource.RangesExpr())
.ExplainPrompt(readRangesSource.ExplainPrompt())
.Done();
- replaces[readRangesSource.Raw()] = newSettings.Ptr();
-
- return TExprBase(ctx.ReplaceNodes(node.Ptr(), replaces));
+
+ return ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index 71b1d565cc..58f7e35062 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -204,8 +204,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path());
TVector<NYql::TKqpReadTableSettings> newSettings;
- NYql::TNodeOnNodeOwnedMap replaces;
- auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
+ NYql::TNodeOnNodeOwnedMap bodyReplaces;
VisitExpr(stage.Program().Body().Ptr(),
[&](const TExprNode::TPtr& exprPtr) -> bool {
TExprBase expr(exprPtr);
@@ -225,7 +224,7 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
return input;
});
if (newExpr.Ptr() != expr.Ptr()) {
- replaces[expr.Raw()] = newExpr.Ptr();
+ bodyReplaces[expr.Raw()] = newExpr.Ptr();
}
}
return true;
@@ -238,17 +237,28 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
}
}
- auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
- .Table(readRangesSource.Table())
- .Columns(readRangesSource.Columns())
- .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
- .RangesExpr(readRangesSource.RangesExpr())
- .ExplainPrompt(readRangesSource.ExplainPrompt())
- .Done();
- replaces[readRangesSource.Raw()] = newSource.Ptr();
+ if (settings != newSettings[0]) {
+ auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
+ .Table(readRangesSource.Table())
+ .Columns(readRangesSource.Columns())
+ .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
+ .RangesExpr(readRangesSource.RangesExpr())
+ .ExplainPrompt(readRangesSource.ExplainPrompt())
+ .Done();
+ stage = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSource, exprCtx);
+ }
+ }
+
+ if (bodyReplaces.empty()) {
+ return stage;
}
- return TExprBase(exprCtx.ReplaceNodes(node.Ptr(), replaces));
+ return Build<TDqStage>(exprCtx, stage.Pos())
+ .Inputs(stage.Inputs())
+ .Outputs(stage.Outputs())
+ .Settings(stage.Settings())
+ .Program(TCoLambda(exprCtx.ReplaceNodes(stage.Program().Ptr(), bodyReplaces)))
+ .Done();
}
} // namespace NKikimr::NKqp::NOpt