summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-07-09 12:58:08 +0300
committeratarasov5 <[email protected]>2025-07-09 13:23:47 +0300
commit7c10e44e4cedbf09d754e8ff05392a9d168af143 (patch)
treeefcb9964c9ae273f31c7e4893b41421aaaffff0b /yql/essentials/minikql/mkql_program_builder.cpp
parent3611e9a59db095abcc485ca2609a38274bbec210 (diff)
YQL-20080: flow -> stream rewrite
В этом пре переписал ноды `Wide{Top,TopSort,Sort}Blocks` с flow на stream реализацию. Я разбил пр на два коммита: первый, это просто двигаю классы вверх вниз. Второй - сами изменения. [Прогон тестов](https://nda.ya.ru/t/P9kfAmHr7GFmgy с понижением Runtime версии commit_hash:0813a74aaa904b12846692c0e7504334170ea6db
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp51
1 files changed, 39 insertions, 12 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 274090f7b18..49332e3fe0c 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -1574,15 +1574,15 @@ TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode flow, TRuntimeNode cou
}
TRuntimeNode TProgramBuilder::WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
- return BuildWideTopOrSort(__func__, flow, count, keys);
+ return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/true);
}
TRuntimeNode TProgramBuilder::WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
- return BuildWideTopOrSort(__func__, flow, count, keys);
+ return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/true);
}
TRuntimeNode TProgramBuilder::WideSortBlocks(TRuntimeNode flow, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
- return BuildWideTopOrSort(__func__, flow, Nothing(), keys);
+ return BuildWideTopOrSort(__func__, flow, Nothing(), keys, /*isBlocks=*/true);
}
TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) {
@@ -1900,25 +1900,42 @@ TRuntimeNode TProgramBuilder::Sort(TRuntimeNode list, TRuntimeNode ascending, co
TRuntimeNode TProgramBuilder::WideTop(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys)
{
- return BuildWideTopOrSort(__func__, flow, count, keys);
+ return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/false);
}
TRuntimeNode TProgramBuilder::WideTopSort(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys)
{
- return BuildWideTopOrSort(__func__, flow, count, keys);
+ return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/false);
}
TRuntimeNode TProgramBuilder::WideSort(TRuntimeNode flow, const std::vector<std::pair<ui32, TRuntimeNode>>& keys)
{
- return BuildWideTopOrSort(__func__, flow, Nothing(), keys);
+ return BuildWideTopOrSort(__func__, flow, Nothing(), keys, /*isBlocks=*/false);
}
-TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callableName, TRuntimeNode flow, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
- const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, flow.GetStaticType()));
- MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size());
+TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callableName, TRuntimeNode stream, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys, bool isBlocks) {
+ if (isBlocks) {
+ return BuildWideTopOrSortImpl(callableName, stream, count, keys, TType::EKind::Stream);
+ } else {
+ return BuildWideTopOrSortImpl(callableName, stream, count, keys, TType::EKind::Flow);
+ }
+}
- TCallableBuilder callableBuilder(Env_, callableName, flow.GetStaticType());
- callableBuilder.Add(flow);
+TRuntimeNode TProgramBuilder::BuildWideTopOrSortImpl(const std::string_view& callableName, TRuntimeNode stream, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys, TType::EKind streamKind) {
+ MKQL_ENSURE(stream.GetStaticType()->GetKind() == streamKind, "Mismatched input type");
+ const auto width = GetWideComponentsCount(stream.GetStaticType());
+ MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size());
+ bool shouldRewriteToFlow = RuntimeVersion < 64U && streamKind == TType::EKind::Stream;
+ if (shouldRewriteToFlow) {
+ // Preserve the old behaviour for ABI compatibility.
+ // Emit (FromFlow (Wide{Top,TopSort,Sort}Blocks (ToFlow (<stream>)))) to
+ // process the flow in favor to the given stream following
+ // the older MKQL ABI.
+ // FIXME: Drop the branch below, when the time comes.
+ stream = ToFlow(stream);
+ }
+ TCallableBuilder callableBuilder(Env_, callableName, stream.GetStaticType());
+ callableBuilder.Add(stream);
if (count) {
callableBuilder.Add(*count);
}
@@ -1928,7 +1945,17 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl
callableBuilder.Add(NewDataLiteral(key.first));
callableBuilder.Add(key.second);
});
- return TRuntimeNode(callableBuilder.Build(), false);
+
+ auto resultNode = TRuntimeNode(callableBuilder.Build(), false);
+ if (shouldRewriteToFlow) {
+ // Preserve the old behaviour for ABI compatibility.
+ // Emit (FromFlow (Wide{Top,TopSort,Sort}Blocks (ToFlow (<stream>)))) to
+ // process the flow in favor to the given stream following
+ // the older MKQL ABI.
+ // FIXME: Drop the branch below, when the time comes.
+ return FromFlow(resultNode);
+ }
+ return resultNode;
}
TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) {