summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-08-01 12:50:59 +0300
committeratarasov5 <[email protected]>2025-08-01 13:19:39 +0300
commitdd74f77fb65e154f13376538c07dc908ac55cc3b (patch)
treedff76c3080e7b59b0008b13729bfcafedfec9d84 /yql/essentials/minikql/mkql_program_builder.cpp
parent0cb0942d9ea385bc978073f3b4ea866052f2b73e (diff)
YQL-20229: Add WideMap stream overload
commit_hash:297647045a9ca9c90137f0ec6488181f81fe2447
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp31
1 files changed, 23 insertions, 8 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 93c808e51db..284f2222c30 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -3768,9 +3768,10 @@ TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda&
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& handler) {
- const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
-
+TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flowOrStream, const TWideLambda& handler) {
+ MKQL_ENSURE(flowOrStream.GetStaticType()->IsFlow() || flowOrStream.GetStaticType()->IsStream(), "Flow or stream type expected.");
+ const auto wideComponents = GetWideComponents(flowOrStream.GetStaticType());
+ bool shouldRewriteToFlow = RuntimeVersion < 67 && flowOrStream.GetStaticType()->IsStream();
TRuntimeNode::TList itemArgs;
itemArgs.reserve(wideComponents.size());
auto i = 0U;
@@ -3782,11 +3783,25 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand
tupleItems.reserve(newItems.size());
std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems)));
- callableBuilder.Add(flow);
- std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
- std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
- return TRuntimeNode(callableBuilder.Build(), false);
+ auto fillCallableBuilder = [&] (TCallableBuilder& builder, TRuntimeNode input) {
+ builder.Add(input);
+ std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(builder), std::placeholders::_1));
+ std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(builder), std::placeholders::_1));
+ return TRuntimeNode(builder.Build(), false);
+ };
+
+ if (shouldRewriteToFlow) {
+ TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems)));
+ return FromFlow(fillCallableBuilder(callableBuilder, ToFlow(flowOrStream)));
+ } else if (flowOrStream.GetStaticType()->IsFlow()) {
+ TCallableBuilder callableBuilder(Env_, __func__, NewFlowType(NewMultiType(tupleItems)));
+ return fillCallableBuilder(callableBuilder, flowOrStream);
+ } else if (flowOrStream.GetStaticType()->IsStream()) {
+ TCallableBuilder callableBuilder(Env_, __func__, NewStreamType(NewMultiType(tupleItems)));
+ return fillCallableBuilder(callableBuilder, flowOrStream);
+ } else {
+ Y_UNREACHABLE();
+ }
}
TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda& init, const TBinaryWideLambda& update) {