diff options
author | imunkin <imunkin@yandex-team.com> | 2025-02-11 13:45:10 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-02-11 14:34:43 +0300 |
commit | 50c6140bfda6b86cd98464d7df5feb27c78a496c (patch) | |
tree | c73ab45d6e517b6ce1a92c9dfc244ae4f24eb97b /yql/essentials/minikql/mkql_program_builder.cpp | |
parent | 8e899a3e9c25ea69dacab575d77421116dbef8bb (diff) | |
download | ydb-50c6140bfda6b86cd98464d7df5feb27c78a496c.tar.gz |
YQL-19424: Use WideStream instead of WideFlow in WideToBlocks computation node
commit_hash:0c0bfb556ff1f51f3293899c0364cd56c3965c41
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index d7df4981b3..717eb9ec32 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -1476,21 +1476,36 @@ TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) { return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) { - TType* outputItemType; - { - const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); - std::vector<TType*> outputItems; - outputItems.reserve(wideComponents.size()); - for (size_t i = 0; i < wideComponents.size(); ++i) { - outputItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many)); - } - outputItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); - outputItemType = NewMultiType(outputItems); +TType* TProgramBuilder::BuildWideBlockType(const TArrayRef<TType* const>& wideComponents) { + std::vector<TType*> blockItems; + blockItems.reserve(wideComponents.size()); + for (size_t i = 0; i < wideComponents.size(); i++) { + blockItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many)); } + blockItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + return NewMultiType(blockItems); +} - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputItemType)); - callableBuilder.Add(flow); +TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) { + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type"); + if constexpr (RuntimeVersion < 58U) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (WideToBlocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + const auto inputFlow = ToFlow(stream); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, inputFlow.GetStaticType())); + TType* outputMultiType = BuildWideBlockType(wideComponents); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); + callableBuilder.Add(inputFlow); + const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false); + return FromFlow(outputFlow); + } + const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, stream.GetStaticType())); + TType* outputMultiType = BuildWideBlockType(wideComponents); + TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType)); + callableBuilder.Add(stream); return TRuntimeNode(callableBuilder.Build(), false); } |