diff options
| author | atarasov5 <[email protected]> | 2025-07-09 12:58:08 +0300 |
|---|---|---|
| committer | atarasov5 <[email protected]> | 2025-07-09 13:23:47 +0300 |
| commit | 7c10e44e4cedbf09d754e8ff05392a9d168af143 (patch) | |
| tree | efcb9964c9ae273f31c7e4893b41421aaaffff0b /yql/essentials/minikql/mkql_program_builder.cpp | |
| parent | 3611e9a59db095abcc485ca2609a38274bbec210 (diff) | |
YQL-20080: flow -> stream rewrite
В этом пре переписал ноды `Wide{Top,TopSort,Sort}Blocks` с flow на stream реализацию.
Я разбил пр на два коммита: первый, это просто двигаю классы вверх вниз. Второй - сами изменения.
[Прогон тестов](https://nda.ya.ru/t/P9kfAmHr7GFmgy с понижением Runtime версии
commit_hash:0813a74aaa904b12846692c0e7504334170ea6db
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
| -rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 51 |
1 files changed, 39 insertions, 12 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 274090f7b18..49332e3fe0c 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -1574,15 +1574,15 @@ TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode flow, TRuntimeNode cou } TRuntimeNode TProgramBuilder::WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, count, keys); + return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/true); } TRuntimeNode TProgramBuilder::WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, count, keys); + return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/true); } TRuntimeNode TProgramBuilder::WideSortBlocks(TRuntimeNode flow, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, Nothing(), keys); + return BuildWideTopOrSort(__func__, flow, Nothing(), keys, /*isBlocks=*/true); } TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) { @@ -1900,25 +1900,42 @@ TRuntimeNode TProgramBuilder::Sort(TRuntimeNode list, TRuntimeNode ascending, co TRuntimeNode TProgramBuilder::WideTop(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, count, keys); + return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/false); } TRuntimeNode TProgramBuilder::WideTopSort(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, count, keys); + return BuildWideTopOrSort(__func__, flow, count, keys, /*isBlocks=*/false); } TRuntimeNode TProgramBuilder::WideSort(TRuntimeNode flow, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - return BuildWideTopOrSort(__func__, flow, Nothing(), keys); + return BuildWideTopOrSort(__func__, flow, Nothing(), keys, /*isBlocks=*/false); } -TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callableName, TRuntimeNode flow, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { - const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, flow.GetStaticType())); - MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size()); +TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callableName, TRuntimeNode stream, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys, bool isBlocks) { + if (isBlocks) { + return BuildWideTopOrSortImpl(callableName, stream, count, keys, TType::EKind::Stream); + } else { + return BuildWideTopOrSortImpl(callableName, stream, count, keys, TType::EKind::Flow); + } +} - TCallableBuilder callableBuilder(Env_, callableName, flow.GetStaticType()); - callableBuilder.Add(flow); +TRuntimeNode TProgramBuilder::BuildWideTopOrSortImpl(const std::string_view& callableName, TRuntimeNode stream, TMaybe<TRuntimeNode> count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys, TType::EKind streamKind) { + MKQL_ENSURE(stream.GetStaticType()->GetKind() == streamKind, "Mismatched input type"); + const auto width = GetWideComponentsCount(stream.GetStaticType()); + MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size()); + bool shouldRewriteToFlow = RuntimeVersion < 64U && streamKind == TType::EKind::Stream; + if (shouldRewriteToFlow) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (Wide{Top,TopSort,Sort}Blocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + stream = ToFlow(stream); + } + TCallableBuilder callableBuilder(Env_, callableName, stream.GetStaticType()); + callableBuilder.Add(stream); if (count) { callableBuilder.Add(*count); } @@ -1928,7 +1945,17 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl callableBuilder.Add(NewDataLiteral(key.first)); callableBuilder.Add(key.second); }); - return TRuntimeNode(callableBuilder.Build(), false); + + auto resultNode = TRuntimeNode(callableBuilder.Build(), false); + if (shouldRewriteToFlow) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (Wide{Top,TopSort,Sort}Blocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + return FromFlow(resultNode); + } + return resultNode; } TRuntimeNode TProgramBuilder::Top(TRuntimeNode flow, TRuntimeNode count, TRuntimeNode ascending, const TUnaryLambda& keyExtractor) { |
