aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-02-11 13:45:10 +0300
committerimunkin <imunkin@yandex-team.com>2025-02-11 14:34:43 +0300
commit50c6140bfda6b86cd98464d7df5feb27c78a496c (patch)
treec73ab45d6e517b6ce1a92c9dfc244ae4f24eb97b /yql/essentials/minikql/mkql_program_builder.cpp
parent8e899a3e9c25ea69dacab575d77421116dbef8bb (diff)
downloadydb-50c6140bfda6b86cd98464d7df5feb27c78a496c.tar.gz
YQL-19424: Use WideStream instead of WideFlow in WideToBlocks computation node
commit_hash:0c0bfb556ff1f51f3293899c0364cd56c3965c41
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp41
1 files changed, 28 insertions, 13 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index d7df4981b3..717eb9ec32 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -1476,21 +1476,36 @@ TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) {
- TType* outputItemType;
- {
- const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
- std::vector<TType*> outputItems;
- outputItems.reserve(wideComponents.size());
- for (size_t i = 0; i < wideComponents.size(); ++i) {
- outputItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many));
- }
- outputItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
- outputItemType = NewMultiType(outputItems);
+TType* TProgramBuilder::BuildWideBlockType(const TArrayRef<TType* const>& wideComponents) {
+ std::vector<TType*> blockItems;
+ blockItems.reserve(wideComponents.size());
+ for (size_t i = 0; i < wideComponents.size(); i++) {
+ blockItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many));
}
+ blockItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
+ return NewMultiType(blockItems);
+}
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputItemType));
- callableBuilder.Add(flow);
+TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) {
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type");
+ if constexpr (RuntimeVersion < 58U) {
+ // Preserve the old behaviour for ABI compatibility.
+ // Emit (FromFlow (WideToBlocks (ToFlow (<stream>)))) to
+ // process the flow in favor to the given stream following
+ // the older MKQL ABI.
+ // FIXME: Drop the branch below, when the time comes.
+ const auto inputFlow = ToFlow(stream);
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, inputFlow.GetStaticType()));
+ TType* outputMultiType = BuildWideBlockType(wideComponents);
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
+ callableBuilder.Add(inputFlow);
+ const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false);
+ return FromFlow(outputFlow);
+ }
+ const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, stream.GetStaticType()));
+ TType* outputMultiType = BuildWideBlockType(wideComponents);
+ TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType));
+ callableBuilder.Add(stream);
return TRuntimeNode(callableBuilder.Build(), false);
}