aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Neporada <aneporada@ydb.tech>2023-12-30 00:04:52 +0300
committerGitHub <noreply@github.com>2023-12-30 00:04:52 +0300
commitcfe15202c36eb3112e4fb3f995fa1ecf56f628e0 (patch)
treeb8a2c484d4f30ee33ed5b63259082de34792ab1c
parentaf6fb5d78323cf77cfcad7c79134a6c163d3897a (diff)
downloadydb-cfe15202c36eb3112e4fb3f995fa1ecf56f628e0.tar.gz
fix DqReplicate expansion (#818)
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_peephole.cpp20
1 files changed, 19 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
index f89670fe4ee..ed079387ffd 100644
--- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
@@ -607,11 +607,29 @@ NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExp
TVector<TExprBase> branches;
branches.reserve(dqReplicate.Args().Count() - 1);
+ const auto inputKind = dqReplicate.Arg(0).Ref().GetTypeAnn()->GetKind();
+ YQL_ENSURE(inputKind == ETypeAnnotationKind::Stream || inputKind == ETypeAnnotationKind::Flow);
auto inputIndex = NDq::BuildAtomList("0", dqReplicate.Pos(), ctx);
for (size_t i = 1; i < dqReplicate.Args().Count(); ++i) {
branches.emplace_back(inputIndex);
- branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Args().Get(i).Ref()));
+ const auto lambdaOutputKind = dqReplicate.Arg(i).Ref().GetTypeAnn()->GetKind();
+ YQL_ENSURE(lambdaOutputKind == ETypeAnnotationKind::Stream || lambdaOutputKind == ETypeAnnotationKind::Flow);
+ if (lambdaOutputKind != inputKind) {
+ branches.emplace_back(ctx.Builder(dqReplicate.Arg(i).Pos())
+ .Lambda()
+ .Param("arg")
+ .Callable(lambdaOutputKind == ETypeAnnotationKind::Stream ? "ToFlow" : "FromFlow")
+ .Apply(0, dqReplicate.Arg(i).Ptr())
+ .With(0, "arg")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build()
+ );
+ } else {
+ branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Arg(i).Ref()));
+ }
}
return Build<TCoSwitch>(ctx, dqReplicate.Pos())