summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-03-25 16:04:58 +0300
committerziganshinmr <[email protected]>2025-03-25 16:32:13 +0300
commitae51283471d012a56713063d298be892ef732b68 (patch)
tree90aacf0876ff932c9c9f4880887fb3d5f094071a /yql/essentials/minikql/mkql_program_builder.cpp
parent30c40b21dc1527fb15426ef6c9fe742aff8dadb0 (diff)
ListToBlocks computation node
Семантика ноды ListToBlocks - преобразование списка структур в список блочных структур:`List<Struct<a:T1,...>> -> List<Struct<a:Block`<T1>`, ... ,_yql_block_len:Scalar`<Uint64>`>` (как WideToBlocks, но для списков) На вход ожидается ленивый список, на выход также выдается ленивый список commit_hash:0b364f55810a492fdcce7dc06b1e69701a1db01f
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp37
1 files changed, 37 insertions, 0 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index a0b92b2ead4..27c4229f996 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -6,6 +6,7 @@
#include "yql/essentials/minikql/mkql_function_registry.h"
#include "yql/essentials/minikql/mkql_utils.h"
#include "yql/essentials/minikql/mkql_type_builder.h"
+#include "yql/essentials/core/sql_types/block.h"
#include "yql/essentials/core/sql_types/match_recognize.h"
#include "yql/essentials/core/sql_types/time_order_recover.h"
#include <yql/essentials/parser/pg_catalog/catalog.h>
@@ -1493,6 +1494,42 @@ TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TType* TProgramBuilder::BuildBlockStructType(const TStructType* structType) {
+ std::vector<std::pair<std::string_view, TType*>> blockStructItems;
+ blockStructItems.reserve(structType->GetMembersCount() + 1);
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ auto itemType = structType->GetMemberType(i);
+ MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here");
+ blockStructItems.emplace_back(
+ structType->GetMemberName(i),
+ NewBlockType(itemType, TBlockType::EShape::Many)
+ );
+ }
+ blockStructItems.emplace_back(
+ NYql::BlockLengthColumnName,
+ NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)
+ );
+ return NewStructType(blockStructItems);
+}
+
+TRuntimeNode TProgramBuilder::ListToBlocks(TRuntimeNode list) {
+ if constexpr (RuntimeVersion < 60U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(list.GetStaticType()->IsList(), "Expected List as input type");
+ const auto listType = AS_TYPE(TListType, list.GetStaticType());
+
+ MKQL_ENSURE(listType->GetItemType()->IsStruct(), "Expected List of Struct as input type");
+ const auto itemStructType = AS_TYPE(TStructType, listType->GetItemType());
+
+ const auto itemBlockStructType = BuildBlockStructType(itemStructType);
+
+ TCallableBuilder callableBuilder(Env, __func__, NewListType(itemBlockStructType));
+ callableBuilder.Add(list);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType());