diff options
author | Igor Munkin <imunkin@ydb.tech> | 2025-03-26 13:22:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-26 15:22:40 +0500 |
commit | bad75e027ea1ec75f59624443648bb5ef71c73b7 (patch) | |
tree | f60b734f46ede0c0b1972c7128e5aa0e4dd9bb80 | |
parent | 9b00d50ef60fd927443f0fde6c6889c27f67933f (diff) | |
download | ydb-bad75e027ea1ec75f59624443648bb5ef71c73b7.tar.gz |
Make DQ use both WideStream and WideFlow for ReplicateScalars (#16227)
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_build.cpp | 76 |
1 files changed, 51 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_build.cpp b/ydb/library/yql/dq/opt/dq_opt_build.cpp index 11621c6531f..ef6e8a85e62 100644 --- a/ydb/library/yql/dq/opt/dq_opt_build.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_build.cpp @@ -645,12 +645,16 @@ TDqPhyStage DqEnableWideChannelsInputForStage(const TDqPhyStage& stage, TExprCon } bool CanPullReplicateScalars(const TDqPhyStage& stage) { - auto maybeFromFlow = stage.Program().Body().Maybe<TCoFromFlow>(); - if (!maybeFromFlow) { - return false; + if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) { + auto maybeFromFlow = stage.Program().Body().Maybe<TCoFromFlow>(); + if (!maybeFromFlow) { + return false; + } + + return bool(maybeFromFlow.Cast().Input().Maybe<TCoReplicateScalars>()); } - return bool(maybeFromFlow.Cast().Input().Maybe<TCoReplicateScalars>()); + return bool(stage.Program().Body().Maybe<TCoReplicateScalars>()); } bool CanPullReplicateScalars(const TDqOutput& output) { @@ -694,33 +698,55 @@ TDqPhyStage DqPullReplicateScalarsFromInputs(const TDqPhyStage& stage, TExprCont TDqPhyStage childStage = conn.Output().Stage().Cast<TDqPhyStage>(); TCoLambda childProgram(ctx.DeepCopyLambda(childStage.Program().Ref())); - TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoFromFlow>().Input().Cast<TCoReplicateScalars>(); + TMaybeNode<TExprBase> newChildStage; + TExprNode::TPtr argReplace; + TExprNode::TPtr newArgNode = newArg.Ptr(); + if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) { + TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoFromFlow>().Input().Cast<TCoReplicateScalars>(); + + // replace FromFlow(ReplicateScalars(x, ...)) with FromFlow(x) + newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos()) + .InitFrom(childStage) + .Program() + .Args(childProgram.Args()) + .Body(ctx.ChangeChild(childProgram.Body().Ref(), TCoFromFlow::idx_Input, childReplicateScalars.Input().Ptr())) + .Build() + .Done(); + argReplace = Build<TCoFromFlow>(ctx, arg.Pos()) + .Input<TCoReplicateScalars>() + .Input<TCoToFlow>() + .Input(newArgNode) + .Build() + .Indexes(childReplicateScalars.Indexes()) + .Build() + .Done() + .Ptr(); + } else { + TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoReplicateScalars>(); + + // replace (ReplicateScalars(x, ...)) with (x) + newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos()) + .InitFrom(childStage) + .Program() + .Args(childProgram.Args()) + .Body(childReplicateScalars.Input()) + .Build() + .Done(); + + argReplace = Build<TCoReplicateScalars>(ctx, arg.Pos()) + .Input(newArgNode) + .Indexes(childReplicateScalars.Indexes()) + .Done() + .Ptr(); + } - // replace FromFlow(ReplicateScalars(x, ...)) with FromFlow(x) - auto newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos()) - .InitFrom(childStage) - .Program() - .Args(childProgram.Args()) - .Body(ctx.ChangeChild(childProgram.Body().Ref(), TCoFromFlow::idx_Input, childReplicateScalars.Input().Ptr())) - .Build() - .Done(); auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos()) .InitFrom(conn.Output()) - .Stage(newChildStage) + .Stage(newChildStage.Cast().Ptr()) .Done(); - newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr())); - TExprNode::TPtr newArgNode = newArg.Ptr(); - TExprNode::TPtr argReplace = Build<TCoFromFlow>(ctx, arg.Pos()) - .Input<TCoReplicateScalars>() - .Input<TCoToFlow>() - .Input(newArgNode) - .Build() - .Indexes(childReplicateScalars.Indexes()) - .Build() - .Done() - .Ptr(); argsMap.emplace(arg.Raw(), argReplace); + newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr())); } else { argsMap.emplace(arg.Raw(), newArg.Ptr()); newInputs.push_back(stage.Inputs().Item(i).Ptr()); |