summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-07-15 17:03:11 +0300
committeratarasov5 <[email protected]>2025-07-15 17:35:09 +0300
commitb88aa58c4c0ab58d67b96c80573946c58f3ffaa2 (patch)
treea85406bb2b2e82e3be0ccbc4ac5db8d1bbf3a857 /yql/essentials/minikql/mkql_program_builder.cpp
parent64137fb0cbe9afe92dca8efc335ef9ff16b78926 (diff)
YQL-20098: Wide{Skip,Take}Blocks rewrite from Flow to Stream
Переписал Wide\{Skip,Take\}Blocks с flow на stream тип Прогон тестов с понижением версии <https://nda.ya.ru/t/as0XUfAF7GUcdA> commit_hash:b953c006690680e4711424f407db5af16b9c2e1c
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp25
1 files changed, 16 insertions, 9 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 49332e3fe0c..472a20b5bb3 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -1565,12 +1565,12 @@ TRuntimeNode TProgramBuilder::ListFromBlocks(TRuntimeNode list) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count) {
- return BuildWideSkipTakeBlocks(__func__, flow, count);
+TRuntimeNode TProgramBuilder::WideSkipBlocks(TRuntimeNode stream, TRuntimeNode count) {
+ return BuildWideSkipTakeBlocks(__func__, stream, count);
}
-TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count) {
- return BuildWideSkipTakeBlocks(__func__, flow, count);
+TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode stream, TRuntimeNode count) {
+ return BuildWideSkipTakeBlocks(__func__, stream, count);
}
TRuntimeNode TProgramBuilder::WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
@@ -2758,16 +2758,23 @@ TRuntimeNode TProgramBuilder::BuildMinMax(const std::string_view& callableName,
return BuildMinMax(callableName, args.data(), args.size());
}
-TRuntimeNode TProgramBuilder::BuildWideSkipTakeBlocks(const std::string_view& callableName, TRuntimeNode flow, TRuntimeNode count) {
- ValidateBlockFlowType(flow.GetStaticType());
+TRuntimeNode TProgramBuilder::BuildWideSkipTakeBlocks(const std::string_view& callableName, TRuntimeNode stream, TRuntimeNode count) {
+ ValidateBlockStreamType(stream.GetStaticType());
+ if constexpr (RuntimeVersion < 65U) {
+ stream = ToFlow(stream);
+ }
MKQL_ENSURE(count.GetStaticType()->IsData(), "Expected data");
MKQL_ENSURE(static_cast<const TDataType&>(*count.GetStaticType()).GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64");
- TCallableBuilder callableBuilder(Env_, callableName, flow.GetStaticType());
- callableBuilder.Add(flow);
+ TCallableBuilder callableBuilder(Env_, callableName, stream.GetStaticType());
+ callableBuilder.Add(stream);
callableBuilder.Add(count);
- return TRuntimeNode(callableBuilder.Build(), false);
+ auto result = TRuntimeNode(callableBuilder.Build(), false);
+ if constexpr (RuntimeVersion < 65U) {
+ result = FromFlow(result);
+ }
+ return result;
}
TRuntimeNode TProgramBuilder::BuildBlockLogical(const std::string_view& callableName, TRuntimeNode first, TRuntimeNode second) {