diff options
| author | atarasov5 <[email protected]> | 2025-08-01 12:50:59 +0300 |
|---|---|---|
| committer | atarasov5 <[email protected]> | 2025-08-01 13:19:39 +0300 |
| commit | dd74f77fb65e154f13376538c07dc908ac55cc3b (patch) | |
| tree | dff76c3080e7b59b0008b13729bfcafedfec9d84 /yql/essentials/minikql/mkql_program_builder.cpp | |
| parent | 0cb0942d9ea385bc978073f3b4ea866052f2b73e (diff) | |
YQL-20229: Add WideMap stream overload
commit_hash:297647045a9ca9c90137f0ec6488181f81fe2447
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
| -rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 93c808e51db..284f2222c30 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -3768,9 +3768,10 @@ TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda& return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& handler) { - const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); - +TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flowOrStream, const TWideLambda& handler) { + MKQL_ENSURE(flowOrStream.GetStaticType()->IsFlow() || flowOrStream.GetStaticType()->IsStream(), "Flow or stream type expected."); + const auto wideComponents = GetWideComponents(flowOrStream.GetStaticType()); + bool shouldRewriteToFlow = RuntimeVersion < 67 && flowOrStream.GetStaticType()->IsStream(); TRuntimeNode::TList itemArgs; itemArgs.reserve(wideComponents.size()); auto i = 0U; @@ -3782,11 +3783,25 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand tupleItems.reserve(newItems.size()); std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems))); - callableBuilder.Add(flow); - std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); - std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); - return TRuntimeNode(callableBuilder.Build(), false); + auto fillCallableBuilder = [&] (TCallableBuilder& builder, TRuntimeNode input) { + builder.Add(input); + std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(builder), std::placeholders::_1)); + std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(builder), std::placeholders::_1)); + return TRuntimeNode(builder.Build(), false); + }; + + if (shouldRewriteToFlow) { + TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems))); + return FromFlow(fillCallableBuilder(callableBuilder, ToFlow(flowOrStream))); + } else if (flowOrStream.GetStaticType()->IsFlow()) { + TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems))); + return fillCallableBuilder(callableBuilder, flowOrStream); + } else if (flowOrStream.GetStaticType()->IsStream()) { + TCallableBuilder callableBuilder(Env_, __func__, NewStreamType(NewMultiType(tupleItems))); + return fillCallableBuilder(callableBuilder, flowOrStream); + } else { + Y_UNREACHABLE(); + } } TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda& init, const TBinaryWideLambda& update) { |
