aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-01-17 16:12:38 +0300
committerimunkin <imunkin@yandex-team.com>2025-01-17 16:29:14 +0300
commit5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch)
tree89f3800c888c589798de9e4f5a34384eda2f6808 /yql/essentials/minikql/mkql_program_builder.cpp
parente2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff)
downloadydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp24
1 files changed, 20 insertions, 4 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 82c1f604ba..6e832cf509 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -1503,12 +1503,28 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) {
- auto outputItems = ValidateBlockFlowType(flow.GetStaticType());
+TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) {
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type");
+ if constexpr (RuntimeVersion < 54U) {
+ // Preserve the old behaviour for ABI compatibility.
+ // Emit (FromFlow (WideFromBlocks (ToFlow (<stream>)))) to
+ // process the flow in favor to the given stream following
+ // the older MKQL ABI.
+ // FIXME: Drop the branch below, when the time comes.
+ const auto inputFlow = ToFlow(stream);
+ auto outputItems = ValidateBlockFlowType(inputFlow.GetStaticType());
+ outputItems.pop_back();
+ TType* outputMultiType = NewMultiType(outputItems);
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
+ callableBuilder.Add(inputFlow);
+ const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false);
+ return FromFlow(outputFlow);
+ }
+ auto outputItems = ValidateBlockStreamType(stream.GetStaticType());
outputItems.pop_back();
TType* outputMultiType = NewMultiType(outputItems);
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
- callableBuilder.Add(flow);
+ TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType));
+ callableBuilder.Add(stream);
return TRuntimeNode(callableBuilder.Build(), false);
}