diff options
author | Andrey Neporada <aneporada@ydb.tech> | 2023-12-30 00:04:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-30 00:04:52 +0300 |
commit | cfe15202c36eb3112e4fb3f995fa1ecf56f628e0 (patch) | |
tree | b8a2c484d4f30ee33ed5b63259082de34792ab1c | |
parent | af6fb5d78323cf77cfcad7c79134a6c163d3897a (diff) | |
download | ydb-cfe15202c36eb3112e4fb3f995fa1ecf56f628e0.tar.gz |
fix DqReplicate expansion (#818)
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_peephole.cpp | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp index f89670fe4ee..ed079387ffd 100644 --- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp @@ -607,11 +607,29 @@ NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExp TVector<TExprBase> branches; branches.reserve(dqReplicate.Args().Count() - 1); + const auto inputKind = dqReplicate.Arg(0).Ref().GetTypeAnn()->GetKind(); + YQL_ENSURE(inputKind == ETypeAnnotationKind::Stream || inputKind == ETypeAnnotationKind::Flow); auto inputIndex = NDq::BuildAtomList("0", dqReplicate.Pos(), ctx); for (size_t i = 1; i < dqReplicate.Args().Count(); ++i) { branches.emplace_back(inputIndex); - branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Args().Get(i).Ref())); + const auto lambdaOutputKind = dqReplicate.Arg(i).Ref().GetTypeAnn()->GetKind(); + YQL_ENSURE(lambdaOutputKind == ETypeAnnotationKind::Stream || lambdaOutputKind == ETypeAnnotationKind::Flow); + if (lambdaOutputKind != inputKind) { + branches.emplace_back(ctx.Builder(dqReplicate.Arg(i).Pos()) + .Lambda() + .Param("arg") + .Callable(lambdaOutputKind == ETypeAnnotationKind::Stream ? "ToFlow" : "FromFlow") + .Apply(0, dqReplicate.Arg(i).Ptr()) + .With(0, "arg") + .Seal() + .Seal() + .Seal() + .Build() + ); + } else { + branches.emplace_back(ctx.DeepCopyLambda(dqReplicate.Arg(i).Ref())); + } } return Build<TCoSwitch>(ctx, dqReplicate.Pos()) |