aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Munkin <imunkin@ydb.tech>2025-03-26 13:22:40 +0300
committerGitHub <noreply@github.com>2025-03-26 15:22:40 +0500
commitbad75e027ea1ec75f59624443648bb5ef71c73b7 (patch)
treef60b734f46ede0c0b1972c7128e5aa0e4dd9bb80
parent9b00d50ef60fd927443f0fde6c6889c27f67933f (diff)
downloadydb-bad75e027ea1ec75f59624443648bb5ef71c73b7.tar.gz
Make DQ use both WideStream and WideFlow for ReplicateScalars (#16227)
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_build.cpp76
1 files changed, 51 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp
index 11621c6531f..ef6e8a85e62 100644
--- a/ydb/library/yql/dq/opt/dq_opt_build.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp
@@ -645,12 +645,16 @@ TDqPhyStage DqEnableWideChannelsInputForStage(const TDqPhyStage& stage, TExprCon
}
bool CanPullReplicateScalars(const TDqPhyStage& stage) {
- auto maybeFromFlow = stage.Program().Body().Maybe<TCoFromFlow>();
- if (!maybeFromFlow) {
- return false;
+ if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) {
+ auto maybeFromFlow = stage.Program().Body().Maybe<TCoFromFlow>();
+ if (!maybeFromFlow) {
+ return false;
+ }
+
+ return bool(maybeFromFlow.Cast().Input().Maybe<TCoReplicateScalars>());
}
- return bool(maybeFromFlow.Cast().Input().Maybe<TCoReplicateScalars>());
+ return bool(stage.Program().Body().Maybe<TCoReplicateScalars>());
}
bool CanPullReplicateScalars(const TDqOutput& output) {
@@ -694,33 +698,55 @@ TDqPhyStage DqPullReplicateScalarsFromInputs(const TDqPhyStage& stage, TExprCont
TDqPhyStage childStage = conn.Output().Stage().Cast<TDqPhyStage>();
TCoLambda childProgram(ctx.DeepCopyLambda(childStage.Program().Ref()));
- TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoFromFlow>().Input().Cast<TCoReplicateScalars>();
+ TMaybeNode<TExprBase> newChildStage;
+ TExprNode::TPtr argReplace;
+ TExprNode::TPtr newArgNode = newArg.Ptr();
+ if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) {
+ TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoFromFlow>().Input().Cast<TCoReplicateScalars>();
+
+ // replace FromFlow(ReplicateScalars(x, ...)) with FromFlow(x)
+ newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
+ .InitFrom(childStage)
+ .Program()
+ .Args(childProgram.Args())
+ .Body(ctx.ChangeChild(childProgram.Body().Ref(), TCoFromFlow::idx_Input, childReplicateScalars.Input().Ptr()))
+ .Build()
+ .Done();
+ argReplace = Build<TCoFromFlow>(ctx, arg.Pos())
+ .Input<TCoReplicateScalars>()
+ .Input<TCoToFlow>()
+ .Input(newArgNode)
+ .Build()
+ .Indexes(childReplicateScalars.Indexes())
+ .Build()
+ .Done()
+ .Ptr();
+ } else {
+ TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoReplicateScalars>();
+
+ // replace (ReplicateScalars(x, ...)) with (x)
+ newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
+ .InitFrom(childStage)
+ .Program()
+ .Args(childProgram.Args())
+ .Body(childReplicateScalars.Input())
+ .Build()
+ .Done();
+
+ argReplace = Build<TCoReplicateScalars>(ctx, arg.Pos())
+ .Input(newArgNode)
+ .Indexes(childReplicateScalars.Indexes())
+ .Done()
+ .Ptr();
+ }
- // replace FromFlow(ReplicateScalars(x, ...)) with FromFlow(x)
- auto newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
- .InitFrom(childStage)
- .Program()
- .Args(childProgram.Args())
- .Body(ctx.ChangeChild(childProgram.Body().Ref(), TCoFromFlow::idx_Input, childReplicateScalars.Input().Ptr()))
- .Build()
- .Done();
auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos())
.InitFrom(conn.Output())
- .Stage(newChildStage)
+ .Stage(newChildStage.Cast().Ptr())
.Done();
- newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()));
- TExprNode::TPtr newArgNode = newArg.Ptr();
- TExprNode::TPtr argReplace = Build<TCoFromFlow>(ctx, arg.Pos())
- .Input<TCoReplicateScalars>()
- .Input<TCoToFlow>()
- .Input(newArgNode)
- .Build()
- .Indexes(childReplicateScalars.Indexes())
- .Build()
- .Done()
- .Ptr();
argsMap.emplace(arg.Raw(), argReplace);
+ newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()));
} else {
argsMap.emplace(arg.Raw(), newArg.Ptr());
newInputs.push_back(stage.Inputs().Item(i).Ptr());