diff options
author | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:12:38 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:29:14 +0300 |
commit | 5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch) | |
tree | 89f3800c888c589798de9e4f5a34384eda2f6808 /yql/essentials/minikql/mkql_program_builder.cpp | |
parent | e2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff) | |
download | ydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz |
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 82c1f604ba..6e832cf509 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -1503,12 +1503,28 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) { - auto outputItems = ValidateBlockFlowType(flow.GetStaticType()); +TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) { + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type"); + if constexpr (RuntimeVersion < 54U) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (WideFromBlocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + const auto inputFlow = ToFlow(stream); + auto outputItems = ValidateBlockFlowType(inputFlow.GetStaticType()); + outputItems.pop_back(); + TType* outputMultiType = NewMultiType(outputItems); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); + callableBuilder.Add(inputFlow); + const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false); + return FromFlow(outputFlow); + } + auto outputItems = ValidateBlockStreamType(stream.GetStaticType()); outputItems.pop_back(); TType* outputMultiType = NewMultiType(outputItems); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); - callableBuilder.Add(flow); + TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType)); + callableBuilder.Add(stream); return TRuntimeNode(callableBuilder.Build(), false); } |