diff options
| author | ziganshinmr <[email protected]> | 2025-03-27 14:59:42 +0300 |
|---|---|---|
| committer | ziganshinmr <[email protected]> | 2025-03-27 15:16:27 +0300 |
| commit | 3098b5d7f2fe6c0bbee56a7b57371dec369a9e1a (patch) | |
| tree | 81c68ec3e758783d71e03baa8f8ec509e89f8aa1 | |
| parent | e925ea9f73b6f4eda560bf872f261e671c82b2d2 (diff) | |
ListFromBlocks computation node
commit_hash:bae79a39ae78ceed103c460f7949d1a2483e0b73
| -rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_blocks.cpp | 219 | ||||
| -rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_blocks.h | 1 | ||||
| -rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_factory.cpp | 1 | ||||
| -rw-r--r-- | yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp | 131 | ||||
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_block_impl.cpp | 6 | ||||
| -rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 37 | ||||
| -rw-r--r-- | yql/essentials/minikql/mkql_program_builder.h | 2 | ||||
| -rw-r--r-- | yql/essentials/minikql/mkql_runtime_version.h | 2 | ||||
| -rw-r--r-- | yql/essentials/minikql/mkql_type_builder.cpp | 42 | ||||
| -rw-r--r-- | yql/essentials/minikql/mkql_type_builder.h | 4 |
10 files changed, 382 insertions, 63 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp index 5f54771ab7f..6bf6bc38894 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp @@ -1186,6 +1186,208 @@ private: const TVector<TType*> Types_; }; +struct TListFromBlocksState : public TComputationValue<TListFromBlocksState> { +public: + TListFromBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t blockLengthIndex) + : TComputationValue(memInfo) + , HolderFactory_(ctx.HolderFactory) + , BlockLengthIndex_(blockLengthIndex) + , Readers_(types.size()) + , Converters_(types.size()) + , ValuesDescr_(ToValueDescr(types)) + { + const auto& pgBuilder = ctx.Builder->GetPgBuilder(); + for (size_t i = 0; i < types.size(); ++i) { + if (i == blockLengthIndex) { + continue; + } + const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType(); + Readers_[i] = MakeBlockReader(TTypeInfoHelper(), blockItemType); + Converters_[i] = MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder); + } + } + + NUdf::TUnboxedValue GetRow() { + MKQL_ENSURE(CurrentRow_ < RowCount_, "Rows out of range"); + + NUdf::TUnboxedValue* outItems = nullptr; + auto row = HolderFactory_.CreateDirectArrayHolder(Readers_.size() - 1, outItems); + + size_t outputStructIdx = 0; + for (size_t i = 0; i < Readers_.size(); i++) { + if (i == BlockLengthIndex_) { + continue; + } + + const auto& datum = TArrowBlock::From(BlockItems_[i]).GetDatum(); + ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[i], datum.descr()); + + TBlockItem item; + if (datum.is_scalar()) { + item = Readers_[i]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + item = Readers_[i]->GetItem(*datum.array(), CurrentRow_); + } + + outItems[outputStructIdx++] = Converters_[i]->MakeValue(item, HolderFactory_); + } + + CurrentRow_++; + return row; + } + + void SetBlock(NUdf::TUnboxedValue block) { + BlockItems_ = block.GetElements(); + Block_ = std::move(block); + + CurrentRow_ = 0; + RowCount_ = GetBlockCount(BlockItems_[BlockLengthIndex_]); + } + + bool HasRows() const { + return CurrentRow_ < RowCount_; + } + +private: + const THolderFactory& HolderFactory_; + + size_t CurrentRow_ = 0; + size_t RowCount_ = 0; + + size_t BlockLengthIndex_ = 0; + + NUdf::TUnboxedValue Block_; + const NUdf::TUnboxedValue* BlockItems_ = nullptr; + + std::vector<std::unique_ptr<IBlockReader>> Readers_; + std::vector<std::unique_ptr<IBlockItemConverter>> Converters_; + const std::vector<arrow::ValueDescr> ValuesDescr_; +}; + +class TListFromBlocksWrapper : public TMutableComputationNode<TListFromBlocksWrapper> +{ + using TBaseComputation = TMutableComputationNode<TListFromBlocksWrapper>; + +public: + TListFromBlocksWrapper(TComputationMutables& mutables, + IComputationNode* list, + TStructType* structType + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , List_(list) + { + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) { + BlockLengthIndex_ = i; + Types_.push_back(nullptr); + continue; + } + Types_.push_back(structType->GetMemberType(i)); + } + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + return ctx.HolderFactory.Create<TListFromBlocksValue>( + ctx, + Types_, + BlockLengthIndex_, + List_->GetValue(ctx) + ); + } + +private: + class TListFromBlocksValue : public TCustomListValue { + using TState = TListFromBlocksState; + + public: + class TIterator : public TComputationValue<TIterator> { + public: + TIterator(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& iter) + : TComputationValue<TIterator>(memInfo) + , BlockState_(std::move(blockState)) + , Iter_(std::move(iter)) + {} + + private: + bool Next(NUdf::TUnboxedValue& value) final { + auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); + if (!blockState.HasRows()) { + NUdf::TUnboxedValue block; + if (!Iter_.Next(block)) { + return false; + } + blockState.SetBlock(std::move(block)); + } + + value = blockState.GetRow(); + return true; + } + + private: + const NUdf::TUnboxedValue BlockState_; + const NUdf::TUnboxedValue Iter_; + }; + + TListFromBlocksValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, + const TVector<TType*>& types, ui32 blockLengthIndex, NUdf::TUnboxedValue&& list + ) + : TCustomListValue(memInfo) + , CompCtx_(ctx) + , Types_(types) + , BlockLengthIndex_(blockLengthIndex) + , List_(std::move(list)) + {} + + private: + NUdf::TUnboxedValue GetListIterator() const final { + auto state = CompCtx_.HolderFactory.Create<TState>(CompCtx_, Types_, BlockLengthIndex_); + return CompCtx_.HolderFactory.Create<TIterator>(std::move(state), List_.GetListIterator()); + } + + bool HasListItems() const final { + if (!HasItems.has_value()) { + HasItems = List_.HasListItems(); + } + return *HasItems; + } + + ui64 GetListLength() const final { + if (!Length.has_value()) { + auto iter = List_.GetListIterator(); + + Length = 0; + NUdf::TUnboxedValue block; + while (iter.Next(block)) { + auto blockLengthValue = block.GetElement(BlockLengthIndex_); + *Length += GetBlockCount(blockLengthValue); + } + } + + return *Length; + } + + private: + TComputationContext& CompCtx_; + + const TVector<TType*>& Types_; + size_t BlockLengthIndex_ = 0; + + NUdf::TUnboxedValue List_; + }; + + void RegisterDependencies() const final { + this->DependsOn(List_); + } + +private: + TVector<TType*> Types_; + size_t BlockLengthIndex_ = 0; + + IComputationNode* const List_; +}; + class TPrecomputedArrowNode : public IArrowKernelComputationNode { public: TPrecomputedArrowNode(const arrow::Datum& datum, TStringBuf kernelName) @@ -1645,6 +1847,23 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode return new TWideFromBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items)); } +IComputationNode* WrapListFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsList(), "Expected List as an input"); + const auto outputType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(outputType->IsList(), "Expected List as an output"); + + const auto inputItemType = AS_TYPE(TListType, inputType)->GetItemType(); + MKQL_ENSURE(inputItemType->IsStruct(), "Expected List of Struct as an input"); + const auto outputItemType = AS_TYPE(TListType, outputType)->GetItemType(); + MKQL_ENSURE(outputItemType->IsStruct(), "Expected List of Struct as an output"); + + const auto list = LocateNode(ctx.NodeLocator, callable, 0); + return new TListFromBlocksWrapper(ctx.Mutables, list, AS_TYPE(TStructType, inputItemType)); +} + IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.h b/yql/essentials/minikql/comp_nodes/mkql_blocks.h index 326e25e57d9..fc718212091 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.h +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.h @@ -10,6 +10,7 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapListFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapReplicateScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx); diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp index 9615a5d4d78..e32aaf1fd79 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp @@ -291,6 +291,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"WideTopSortBlocks", &WrapWideTopSortBlocks}, {"WideSortBlocks", &WrapWideSortBlocks}, {"ListToBlocks", &WrapListToBlocks}, + {"ListFromBlocks", &WrapListFromBlocks}, {"AsScalar", &WrapAsScalar}, {"ReplicateScalar", &WrapReplicateScalar}, {"BlockCoalesce", &WrapBlockCoalesce}, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index 15dd02bf30f..c2faa29bed5 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -80,6 +80,71 @@ namespace { }); })); } + + // Hand-made variant using WideToBlocks (in order to test ListFromBlocks by well-tested nodes rather than actual ListToBlocks) + TRuntimeNode ListToBlocks(TProgramBuilder& pb, TRuntimeNode list) { + const auto wideBlocksStream = pb.WideToBlocks(pb.FromFlow(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { + return { + pb.Member(item, "key"), + pb.Member(item, "value") + }; + }))); + + return pb.Collect(pb.NarrowMap(pb.ToFlow(wideBlocksStream), [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pb.NewStruct({ + {"key", items[0]}, + {"value", items[1]}, + {NYql::BlockLengthColumnName, items[2]} + }); + })); + } + + using TListTransformer = std::function<TRuntimeNode(TProgramBuilder&, TRuntimeNode)>; + + void DoTestListToAndFromBlocks(TSetup<false>& setup, TListTransformer listToBlocksImpl, TListTransformer listFromBlocksImpl) { + constexpr size_t TEST_SIZE = 1 << 16; + const TString hugeString(128, '1'); + + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id); + + const auto structType = pb.NewStructType({ + {"key", ui64Type}, + {"value", strType}, + }); + + TVector<TRuntimeNode> listItems; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = hugeString + ToString(i); + listItems.push_back(pb.NewStruct({ + {"key", pb.NewDataLiteral<ui64>(i)}, + // Huge string is used to make less rows fit into one block (in order to test output slicing) + {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)}, + })); + } + + const auto list = pb.NewList(structType, listItems); + const auto blockList = listToBlocksImpl(pb, list); + + const auto graph = setup.BuildGraph(listFromBlocksImpl(pb, blockList)); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue structValue; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = hugeString + ToString(i); + UNIT_ASSERT(iterator.Next(structValue)); + + const auto key = structValue.GetElement(0); + UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i); + const auto value = structValue.GetElement(1); + UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); + } + + UNIT_ASSERT(!iterator.Next(structValue)); + UNIT_ASSERT(!iterator.Next(structValue)); + } } Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) { @@ -170,49 +235,13 @@ Y_UNIT_TEST_LLVM(TestWideToBlocks) { } Y_UNIT_TEST(TestListToBlocks) { - constexpr size_t TEST_SIZE = 1 << 16; - const TString hugeString(128, '1'); - TSetup<false> setup; - TProgramBuilder& pb = *setup.PgmBuilder; - const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id); - - const auto structType = pb.NewStructType({ - {"key", ui64Type}, - {"value", strType}, - }); - - TVector<TRuntimeNode> listItems; - for (size_t i = 0; i < TEST_SIZE; i++) { - const auto str = hugeString + ToString(i); - listItems.push_back(pb.NewStruct({ - {"key", pb.NewDataLiteral<ui64>(i)}, - // Huge string is used to make less rows fit into one block (in order to test output slicing) - {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)}, - })); - } - - const auto list = pb.NewList(structType, listItems); - const auto blockList = pb.ListToBlocks(list); - - const auto graph = setup.BuildGraph(ListFromBlocks(pb, blockList)); - const auto iterator = graph->GetValue().GetListIterator(); - - NUdf::TUnboxedValue structValue; - for (size_t i = 0; i < TEST_SIZE; i++) { - const auto str = hugeString + ToString(i); - UNIT_ASSERT(iterator.Next(structValue)); - - const auto key = structValue.GetElement(0); - UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i); - const auto value = structValue.GetElement(1); - UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); - } - - UNIT_ASSERT(!iterator.Next(structValue)); - UNIT_ASSERT(!iterator.Next(structValue)); + DoTestListToAndFromBlocks( + setup, + [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListToBlocks(list); }, + ListFromBlocks + ); } Y_UNIT_TEST(TestListToBlocksMultiUsage) { @@ -700,6 +729,16 @@ Y_UNIT_TEST_LLVM(TestWideFromBlocks) { UNIT_ASSERT(!iterator.Next(item)); } +Y_UNIT_TEST(TestListFromBlocks) { + TSetup<false> setup; + + DoTestListToAndFromBlocks( + setup, + ListToBlocks, + [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListFromBlocks(list); } + ); +} + Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -741,6 +780,16 @@ Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) { UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } + +Y_UNIT_TEST(TestListToAndFromBlocks) { + TSetup<false> setup; + + DoTestListToAndFromBlocks( + setup, + [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListToBlocks(list); }, + [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListFromBlocks(list); } + ); +} } Y_UNIT_TEST_SUITE(TMiniKQLDirectKernelTest) { diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp index ba9238e45a7..e960e8562e3 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp @@ -220,7 +220,11 @@ std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) { std::vector<arrow::ValueDescr> res; res.reserve(types.size()); for (const auto& type : types) { - res.emplace_back(ToValueDescr(type)); + if (type) { + res.emplace_back(ToValueDescr(type)); + } else { + res.emplace_back(); + } } return res; diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index b11a2924c32..5e8e89da4ca 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -6,7 +6,6 @@ #include "yql/essentials/minikql/mkql_function_registry.h" #include "yql/essentials/minikql/mkql_utils.h" #include "yql/essentials/minikql/mkql_type_builder.h" -#include "yql/essentials/core/sql_types/block.h" #include "yql/essentials/core/sql_types/match_recognize.h" #include "yql/essentials/core/sql_types/time_order_recover.h" #include <yql/essentials/parser/pg_catalog/catalog.h> @@ -1494,24 +1493,6 @@ TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) { return TRuntimeNode(callableBuilder.Build(), false); } -TType* TProgramBuilder::BuildBlockStructType(const TStructType* structType) { - std::vector<std::pair<std::string_view, TType*>> blockStructItems; - blockStructItems.reserve(structType->GetMembersCount() + 1); - for (size_t i = 0; i < structType->GetMembersCount(); i++) { - auto itemType = structType->GetMemberType(i); - MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here"); - blockStructItems.emplace_back( - structType->GetMemberName(i), - NewBlockType(itemType, TBlockType::EShape::Many) - ); - } - blockStructItems.emplace_back( - NYql::BlockLengthColumnName, - NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar) - ); - return NewStructType(blockStructItems); -} - TRuntimeNode TProgramBuilder::ListToBlocks(TRuntimeNode list) { if constexpr (RuntimeVersion < 60U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; @@ -1564,6 +1545,24 @@ TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) { return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::ListFromBlocks(TRuntimeNode list) { + if constexpr (RuntimeVersion < 61U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(list.GetStaticType()->IsList(), "Expected List as input type"); + const auto listType = AS_TYPE(TListType, list.GetStaticType()); + + MKQL_ENSURE(listType->GetItemType()->IsStruct(), "Expected List of Struct as input type"); + const auto itemBlockStructType = AS_TYPE(TStructType, listType->GetItemType()); + + const auto itemStructType = ValidateBlockStructType(itemBlockStructType); + + TCallableBuilder callableBuilder(Env, __func__, NewListType(itemStructType)); + callableBuilder.Add(list); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count) { return BuildWideSkipTakeBlocks(__func__, flow, count); } diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h index fb0b097dfaa..f71d47a3872 100644 --- a/yql/essentials/minikql/mkql_program_builder.h +++ b/yql/essentials/minikql/mkql_program_builder.h @@ -244,6 +244,7 @@ public: TRuntimeNode ListToBlocks(TRuntimeNode list); TRuntimeNode FromBlocks(TRuntimeNode flow); TRuntimeNode WideFromBlocks(TRuntimeNode flow); + TRuntimeNode ListFromBlocks(TRuntimeNode list); TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys); @@ -862,7 +863,6 @@ private: TType* ChooseCommonType(TType* type1, TType* type2); TType* BuildArithmeticCommonType(TType* type1, TType* type2); TType* BuildWideBlockType(const TArrayRef<TType* const>& wideComponents); - TType* BuildBlockStructType(const TStructType* structType); bool IsNull(TRuntimeNode arg); protected: diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h index df026ed9015..ed228c7b03d 100644 --- a/yql/essentials/minikql/mkql_runtime_version.h +++ b/yql/essentials/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 60U +#define MKQL_RUNTIME_VERSION 61U #endif // History: diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp index e37bf91b91f..c9d6e363b4f 100644 --- a/yql/essentials/minikql/mkql_type_builder.cpp +++ b/yql/essentials/minikql/mkql_type_builder.cpp @@ -2806,6 +2806,48 @@ TType* TTypeBuilder::NewVariantType(TType* underlyingType) const { return TVariantType::Create(underlyingType, Env); } +TType* TTypeBuilder::ValidateBlockStructType(const TStructType* structType) const { + MKQL_ENSURE(structType->GetMembersCount() > 0, "Expected at least one column"); + + std::vector<std::pair<std::string_view, TType*>> outStructItems; + outStructItems.reserve(structType->GetMembersCount() - 1); + bool hasBlockLengthColumn = false; + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + auto blockType = AS_TYPE(TBlockType, structType->GetMemberType(i)); + bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; + auto itemType = blockType->GetItemType(); + if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) { + MKQL_ENSURE(isScalar, "Block length column should be scalar"); + MKQL_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64"); + + MKQL_ENSURE(!hasBlockLengthColumn, "Block struct must contain only one block length column"); + hasBlockLengthColumn = true; + } else { + outStructItems.emplace_back(structType->GetMemberName(i), itemType); + } + } + MKQL_ENSURE(hasBlockLengthColumn, "Block struct must contain block length column"); + return NewStructType(outStructItems); +} + +TType* TTypeBuilder::BuildBlockStructType(const TStructType* structType) const { + std::vector<std::pair<std::string_view, TType*>> blockStructItems; + blockStructItems.reserve(structType->GetMembersCount() + 1); + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + auto itemType = structType->GetMemberType(i); + MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here"); + blockStructItems.emplace_back( + structType->GetMemberName(i), + NewBlockType(itemType, TBlockType::EShape::Many) + ); + } + blockStructItems.emplace_back( + NYql::BlockLengthColumnName, + NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar) + ); + return NewStructType(blockStructItems); +} + void RebuildTypeIndex() { HugeSingleton<TPgTypeIndex>()->Rebuild(); } diff --git a/yql/essentials/minikql/mkql_type_builder.h b/yql/essentials/minikql/mkql_type_builder.h index f1ec8a9e4f3..678db9157ca 100644 --- a/yql/essentials/minikql/mkql_type_builder.h +++ b/yql/essentials/minikql/mkql_type_builder.h @@ -2,6 +2,7 @@ #include "mkql_node.h" +#include <yql/essentials/core/sql_types/block.h> #include <yql/essentials/public/udf/udf_type_builder.h> #include <yql/essentials/public/udf/arrow/block_type_helper.h> #include <yql/essentials/parser/pg_wrapper/interface/compare.h> @@ -325,6 +326,9 @@ public: TType* NewResourceType(const std::string_view& tag) const; TType* NewVariantType(TType* underlyingType) const; + TType* BuildBlockStructType(const TStructType* structType) const; + TType* ValidateBlockStructType(const TStructType* structType) const; + protected: const TTypeEnvironment& Env; bool UseNullType = true; |
