diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-25 16:04:58 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-25 16:32:13 +0300 |
commit | ae51283471d012a56713063d298be892ef732b68 (patch) | |
tree | 90aacf0876ff932c9c9f4880887fb3d5f094071a | |
parent | 30c40b21dc1527fb15426ef6c9fe742aff8dadb0 (diff) | |
download | ydb-ae51283471d012a56713063d298be892ef732b68.tar.gz |
ListToBlocks computation node
Семантика ноды ListToBlocks - преобразование списка структур в список блочных структур:`List<Struct<a:T1,...>> -> List<Struct<a:Block`<T1>`, ... ,_yql_block_len:Scalar`<Uint64>`>` (как WideToBlocks, но для списков)
На вход ожидается ленивый список, на выход также выдается ленивый список
commit_hash:0b364f55810a492fdcce7dc06b1e69701a1db01f
-rw-r--r-- | yql/essentials/core/sql_types/block.h | 7 | ||||
-rw-r--r-- | yql/essentials/core/sql_types/ya.make | 1 | ||||
-rw-r--r-- | yql/essentials/core/yql_expr_type_annotation.h | 3 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_blocks.cpp | 236 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_blocks.h | 1 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_factory.cpp | 1 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp | 119 | ||||
-rw-r--r-- | yql/essentials/minikql/comp_nodes/ya.make.inc | 1 | ||||
-rw-r--r-- | yql/essentials/minikql/computation/mkql_block_impl.cpp | 28 | ||||
-rw-r--r-- | yql/essentials/minikql/computation/mkql_block_impl.h | 6 | ||||
-rw-r--r-- | yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt | 2 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 37 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.h | 2 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_runtime_version.h | 2 |
14 files changed, 430 insertions, 16 deletions
diff --git a/yql/essentials/core/sql_types/block.h b/yql/essentials/core/sql_types/block.h new file mode 100644 index 00000000000..ac08ce538d9 --- /dev/null +++ b/yql/essentials/core/sql_types/block.h @@ -0,0 +1,7 @@ +#pragma once + +#include <util/generic/strbuf.h> + +namespace NYql { + inline constexpr TStringBuf BlockLengthColumnName = "_yql_block_length"; +} diff --git a/yql/essentials/core/sql_types/ya.make b/yql/essentials/core/sql_types/ya.make index 136ff07733a..88eb76fb511 100644 --- a/yql/essentials/core/sql_types/ya.make +++ b/yql/essentials/core/sql_types/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + block.h match_recognize.h match_recognize.cpp simple_types.h diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index 13907df704a..b429f46f394 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -5,6 +5,7 @@ #include <yql/essentials/ast/yql_expr.h> #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/core/sql_types/block.h> #include <yql/essentials/minikql/mkql_type_ops.h> #include <yql/essentials/parser/pg_catalog/catalog.h> @@ -348,8 +349,6 @@ const TTypeAnnotationNode* GetOriginalResultType(TPositionHandle pos, bool isMan bool ApplyOriginalType(TExprNode::TPtr input, bool isMany, const TTypeAnnotationNode* originalExtractorType, TExprContext& ctx); TExprNode::TPtr ConvertToMultiLambda(const TExprNode::TPtr& lambda, TExprContext& ctx); -const TStringBuf BlockLengthColumnName = "_yql_block_length"; - TStringBuf NormalizeCallableName(TStringBuf name); void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx); diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp index a0ae63138c4..5f54771ab7f 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp @@ -9,9 +9,11 @@ #include <yql/essentials/minikql/arrow/arrow_util.h> #include <yql/essentials/minikql/mkql_type_builder.h> #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE +#include <yql/essentials/minikql/computation/mkql_custom_list.h> #include <yql/essentials/minikql/mkql_node_builder.h> #include <yql/essentials/minikql/mkql_node_cast.h> +#include <yql/essentials/core/sql_types/block.h> #include <yql/essentials/parser/pg_wrapper/interface/arrow.h> #include <arrow/scalar.h> @@ -461,6 +463,223 @@ private: const size_t MaxLength_; }; +class TListToBlocksState : public TBlockState { +public: + TListToBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t blockLengthIndex, size_t maxLength) + : TBlockState(memInfo, types.size(), blockLengthIndex) + , Builders_(types.size()) + , BlockLengthIndex_(blockLengthIndex) + , MaxLength_(maxLength) + { + for (size_t i = 0; i < types.size(); ++i) { + if (i == blockLengthIndex) { + continue; + } + Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, maxLength, &ctx.Builder->GetPgBuilder(), &BuilderAllocatedSize_); + } + MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_; + } + + void AddRow(const NUdf::TUnboxedValuePod& row) { + auto items = row.GetElements(); + size_t inputStructIdx = 0; + for (size_t i = 0; i < Builders_.size(); i++) { + if (i == BlockLengthIndex_) { + continue; + } + Builders_[i]->Add(items[inputStructIdx++]); + } + Rows_++; + } + + void MakeBlocks(const THolderFactory& holderFactory) { + Values[BlockLengthIndex_] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Rows_))); + Rows_ = 0; + BuilderAllocatedSize_ = 0; + + for (size_t i = 0; i < Builders_.size(); ++i) { + if (i == BlockLengthIndex_) { + continue; + } + Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_)); + } + FillArrays(); + } + + void Finish() { + IsFinished_ = true; + } + + bool IsNotFull() const { + return Rows_ < MaxLength_ && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_; + } + + bool IsFinished() const { + return IsFinished_; + } + + bool HasBlocks() const { + return Count > 0; + } + + bool IsEmpty() const { + return Rows_ == 0; + } + +private: + size_t Rows_ = 0; + bool IsFinished_ = false; + + size_t BuilderAllocatedSize_ = 0; + size_t MaxBuilderAllocatedSize_ = 0; + + std::vector<std::unique_ptr<IArrayBuilder>> Builders_; + + const size_t BlockLengthIndex_; + const size_t MaxLength_; + static const size_t MaxAllocatedFactor_ = 4; +}; + +class TListToBlocksWrapper : public TMutableComputationNode<TListToBlocksWrapper> +{ + using TBaseComputation = TMutableComputationNode<TListToBlocksWrapper>; + +public: + TListToBlocksWrapper(TComputationMutables& mutables, + IComputationNode* list, + TStructType* structType + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , List_(list) + { + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) { + BlockLengthIndex_ = i; + Types_.push_back(nullptr); + continue; + } + Types_.push_back(AS_TYPE(TBlockType, structType->GetMemberType(i))->GetItemType()); + } + + MaxLength_ = CalcBlockLen(std::accumulate(Types_.cbegin(), Types_.cend(), 0ULL, [](size_t max, const TType* type){ + if (!type) { + return max; + } + return std::max(max, CalcMaxBlockItemSize(type)); + })); + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + return ctx.HolderFactory.Create<TListToBlocksValue>( + ctx, + Types_, + BlockLengthIndex_, + List_->GetValue(ctx), + MaxLength_ + ); + } + +private: + class TListToBlocksValue : public TCustomListValue { + using TState = TListToBlocksState; + + public: + class TIterator : public TComputationValue<TIterator> { + public: + TIterator(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& iter) + : TComputationValue<TIterator>(memInfo) + , HolderFactory_(holderFactory) + , BlockState_(std::move(blockState)) + , Iter_(std::move(iter)) + {} + + private: + bool Next(NUdf::TUnboxedValue& value) final { + auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); + const size_t structSize = blockState.Values.size(); + + if (!blockState.HasBlocks()) { + while (!blockState.IsFinished() && blockState.IsNotFull()) { + if (Iter_.Next(Row_)) { + blockState.AddRow(Row_); + } else { + blockState.Finish(); + } + } + if (blockState.IsEmpty()) { + return false; + } + blockState.MakeBlocks(HolderFactory_); + } + + NUdf::TUnboxedValue* items = nullptr; + value = HolderFactory_.CreateDirectArrayHolder(structSize, items); + + const auto sliceSize = blockState.Slice(); + for (size_t i = 0; i < structSize; i++) { + items[i] = blockState.Get(sliceSize, HolderFactory_, i); + } + + return true; + } + + private: + const THolderFactory& HolderFactory_; + + const NUdf::TUnboxedValue BlockState_; + const NUdf::TUnboxedValue Iter_; + + NUdf::TUnboxedValue Row_; + }; + + TListToBlocksValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, + const TVector<TType*>& types, ui32 blockLengthIndex, NUdf::TUnboxedValue&& list, size_t maxLength + ) + : TCustomListValue(memInfo) + , CompCtx_(ctx) + , Types_(types) + , BlockLengthIndex_(blockLengthIndex) + , List_(std::move(list)) + , MaxLength_(maxLength) + {} + + private: + NUdf::TUnboxedValue GetListIterator() const final { + auto state = CompCtx_.HolderFactory.Create<TState>(CompCtx_, Types_, BlockLengthIndex_, MaxLength_); + return CompCtx_.HolderFactory.Create<TIterator>(CompCtx_.HolderFactory, std::move(state), List_.GetListIterator()); + } + + bool HasListItems() const final { + if (!HasItems.has_value()) { + HasItems = List_.HasListItems(); + } + return *HasItems; + } + + private: + TComputationContext& CompCtx_; + + const TVector<TType*>& Types_; + size_t BlockLengthIndex_ = 0; + + NUdf::TUnboxedValue List_; + const size_t MaxLength_; + }; + + void RegisterDependencies() const final { + this->DependsOn(List_); + } + +private: + TVector<TType*> Types_; + size_t BlockLengthIndex_ = 0; + + IComputationNode* const List_; + + size_t MaxLength_ = 0; +}; + class TFromBlocksWrapper : public TStatefulFlowCodegeneratorNode<TFromBlocksWrapper> { using TBaseComputation = TStatefulFlowCodegeneratorNode<TFromBlocksWrapper>; public: @@ -1373,6 +1592,23 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa return new TWideToBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items)); } +IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsList(), "Expected List as an input"); + const auto outputType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(outputType->IsList(), "Expected List as an output"); + + const auto inputItemType = AS_TYPE(TListType, inputType)->GetItemType(); + MKQL_ENSURE(inputItemType->IsStruct(), "Expected List of Struct as an input"); + const auto outputItemType = AS_TYPE(TListType, outputType)->GetItemType(); + MKQL_ENSURE(outputItemType->IsStruct(), "Expected List of Struct as an output"); + + const auto list = LocateNode(ctx.NodeLocator, callable, 0); + return new TListToBlocksWrapper(ctx.Mutables, list, AS_TYPE(TStructType, outputItemType)); +} + IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.h b/yql/essentials/minikql/comp_nodes/mkql_blocks.h index 46f39e71643..326e25e57d9 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.h +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.h @@ -7,6 +7,7 @@ namespace NMiniKQL { IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx); diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp index c6b198463bf..9615a5d4d78 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp @@ -290,6 +290,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"WideTopBlocks", &WrapWideTopBlocks}, {"WideTopSortBlocks", &WrapWideTopSortBlocks}, {"WideSortBlocks", &WrapWideSortBlocks}, + {"ListToBlocks", &WrapListToBlocks}, {"AsScalar", &WrapAsScalar}, {"ReplicateScalar", &WrapReplicateScalar}, {"BlockCoalesce", &WrapBlockCoalesce}, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index a873fbeb5b9..15dd02bf30f 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -5,6 +5,7 @@ #include <arrow/compute/exec_internal.h> #include <arrow/array/builder_primitive.h> +#include <yql/essentials/core/sql_types/block.h> #include <yql/essentials/public/udf/udf_helpers.h> #include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h> @@ -61,6 +62,24 @@ namespace { datums[i + topology->InputArgsCount] = output; } } + + // Hand-made variant using WideFromBlocks (in order to test ListToBlocks by well-tested nodes rather than actual ListFromBlocks) + TRuntimeNode ListFromBlocks(TProgramBuilder& pb, TRuntimeNode blockList) { + const auto wideBlocksStream = pb.FromFlow(pb.ExpandMap(pb.ToFlow(blockList), [&](TRuntimeNode item) -> TRuntimeNode::TList { + return { + pb.Member(item, "key"), + pb.Member(item, "value"), + pb.Member(item, NYql::BlockLengthColumnName) + }; + })); + + return pb.ForwardList(pb.NarrowMap(pb.ToFlow(pb.WideFromBlocks(wideBlocksStream)), [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pb.NewStruct({ + {"key", items[0]}, + {"value", items[1]} + }); + })); + } } Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) { @@ -150,6 +169,106 @@ Y_UNIT_TEST_LLVM(TestWideToBlocks) { UNIT_ASSERT(!iterator.Next(item)); } +Y_UNIT_TEST(TestListToBlocks) { + constexpr size_t TEST_SIZE = 1 << 16; + const TString hugeString(128, '1'); + + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id); + + const auto structType = pb.NewStructType({ + {"key", ui64Type}, + {"value", strType}, + }); + + TVector<TRuntimeNode> listItems; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = hugeString + ToString(i); + listItems.push_back(pb.NewStruct({ + {"key", pb.NewDataLiteral<ui64>(i)}, + // Huge string is used to make less rows fit into one block (in order to test output slicing) + {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)}, + })); + } + + const auto list = pb.NewList(structType, listItems); + const auto blockList = pb.ListToBlocks(list); + + const auto graph = setup.BuildGraph(ListFromBlocks(pb, blockList)); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue structValue; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = hugeString + ToString(i); + UNIT_ASSERT(iterator.Next(structValue)); + + const auto key = structValue.GetElement(0); + UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i); + const auto value = structValue.GetElement(1); + UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); + } + + UNIT_ASSERT(!iterator.Next(structValue)); + UNIT_ASSERT(!iterator.Next(structValue)); +} + +Y_UNIT_TEST(TestListToBlocksMultiUsage) { + constexpr size_t TEST_SIZE = 1 << 10; + + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id); + + const auto structType = pb.NewStructType({ + {"key", ui64Type}, + {"value", strType}, + }); + + TVector<TRuntimeNode> listItems; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = ToString(i); + listItems.push_back(pb.NewStruct({ + {"key", pb.NewDataLiteral<ui64>(i)}, + {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)}, + })); + } + + const auto list = pb.NewList(structType, listItems); + const auto blockList1 = pb.ListToBlocks(list); + const auto blockList2 = pb.ListToBlocks(list); + + const auto result = pb.Zip({ListFromBlocks(pb, blockList1), ListFromBlocks(pb, blockList2)}); + + const auto graph = setup.BuildGraph(result); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue tupleValue; + for (size_t i = 0; i < TEST_SIZE; i++) { + const auto str = ToString(i); + UNIT_ASSERT(iterator.Next(tupleValue)); + + auto structValue = tupleValue.GetElement(0); + auto key = structValue.GetElement(0); + UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i); + auto value = structValue.GetElement(1); + UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); + + structValue = tupleValue.GetElement(1); + key = structValue.GetElement(0); + UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i); + value = structValue.GetElement(1); + UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str); + } + + UNIT_ASSERT(!iterator.Next(tupleValue)); + UNIT_ASSERT(!iterator.Next(tupleValue)); +} + namespace { template<bool LLVM> void TestChunked(bool withBlockExpand) { diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc index 29656cf1285..01f4e8aac3f 100644 --- a/yql/essentials/minikql/comp_nodes/ya.make.inc +++ b/yql/essentials/minikql/comp_nodes/ya.make.inc @@ -162,6 +162,7 @@ PEERDIR( yql/essentials/types/binary_json yql/essentials/minikql yql/essentials/minikql/arrow + yql/essentials/core/sql_types yql/essentials/public/udf/arrow yql/essentials/parser/pg_wrapper/interface yql/essentials/utils diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp index 49d52eb0b88..ba9238e45a7 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp @@ -125,7 +125,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(scalar); } case NUdf::EDataSlot::TzDate: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; @@ -133,7 +133,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>())); } case NUdf::EDataSlot::TzDatetime: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; @@ -141,7 +141,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>())); } case NUdf::EDataSlot::TzTimestamp: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; @@ -149,7 +149,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>())); } case NUdf::EDataSlot::TzDate32: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; @@ -157,7 +157,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>())); } case NUdf::EDataSlot::TzDatetime64: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; @@ -165,13 +165,13 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>())); } case NUdf::EDataSlot::TzTimestamp64: { - auto items = arrow::StructScalar::ValueType{ + auto items = arrow::StructScalar::ValueType{ std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>())); - } + } case NUdf::EDataSlot::Decimal: { std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool))); *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128(); @@ -338,9 +338,11 @@ const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) cons return Parent_->ArgsNodes[index]; } -TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width) - : TBase(memInfo), Values(width), Deques(width - 1ULL), Arrays(width - 1ULL) +TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex) + : TBase(memInfo), Values(width), Deques(width), Arrays(width) + , BlockLengthIndex_(blockLengthIndex == LAST_COLUMN_MARKER ? width - 1 : blockLengthIndex) { + MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER || (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex"); Pointer_ = Values.data(); } @@ -350,13 +352,17 @@ void TBlockState::ClearValues() { void TBlockState::FillArrays() { MKQL_ENSURE(Count == 0, "All existing arrays have to be processed"); - auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum(); + auto& counterDatum = TArrowBlock::From(Values[BlockLengthIndex_]).GetDatum(); MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value; if (!Count) return; for (size_t i = 0U; i < Deques.size(); ++i) { + if (i == BlockLengthIndex_) { + continue; + } + Deques[i].clear(); if (const auto& value = Values[i]) { const auto& datum = TArrowBlock::From(value).GetDatum(); @@ -399,7 +405,7 @@ ui64 TBlockState::Slice() { } NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const { - if (idx >= Deques.size()) + if (idx == BlockLengthIndex_) return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); if (auto array = Arrays[idx]) diff --git a/yql/essentials/minikql/computation/mkql_block_impl.h b/yql/essentials/minikql/computation/mkql_block_impl.h index c94047aa049..61964c030e0 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.h +++ b/yql/essentials/minikql/computation/mkql_block_impl.h @@ -90,6 +90,8 @@ private: }; struct TBlockState : public TComputationValue<TBlockState> { + static constexpr i64 LAST_COLUMN_MARKER = -1; + using TBase = TComputationValue<TBlockState>; ui64 Count = 0; @@ -99,7 +101,9 @@ struct TBlockState : public TComputationValue<TBlockState> { std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques; std::vector<std::shared_ptr<arrow::ArrayData>> Arrays; - TBlockState(TMemoryUsageInfo* memInfo, size_t width); + ui64 BlockLengthIndex_ = 0; + + TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex = LAST_COLUMN_MARKER); void ClearValues(); diff --git a/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt b/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt index afc435fb443..66c4d23bc22 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt +++ b/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt @@ -38,7 +38,7 @@ namespace NKikimr::NMiniKQL { : TBase(context) , CountType(llvm::Type::getInt64Ty(Context)) , PointerType(llvm::PointerType::getUnqual(llvm::ArrayType::get(llvm::Type::getInt128Ty(Context), width))) - , SkipSpaceType(llvm::ArrayType::get(llvm::Type::getInt64Ty(Context), 9U)) // Skip std::vectors Values & Arrays + , SkipSpaceType(llvm::ArrayType::get(llvm::Type::getInt64Ty(Context), 10U)) // Skip Values, Deques, Arrays std::vectors and BlockLengthIndex ui64 {} }; #endif diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index a0b92b2ead4..27c4229f996 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -6,6 +6,7 @@ #include "yql/essentials/minikql/mkql_function_registry.h" #include "yql/essentials/minikql/mkql_utils.h" #include "yql/essentials/minikql/mkql_type_builder.h" +#include "yql/essentials/core/sql_types/block.h" #include "yql/essentials/core/sql_types/match_recognize.h" #include "yql/essentials/core/sql_types/time_order_recover.h" #include <yql/essentials/parser/pg_catalog/catalog.h> @@ -1493,6 +1494,42 @@ TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) { return TRuntimeNode(callableBuilder.Build(), false); } +TType* TProgramBuilder::BuildBlockStructType(const TStructType* structType) { + std::vector<std::pair<std::string_view, TType*>> blockStructItems; + blockStructItems.reserve(structType->GetMembersCount() + 1); + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + auto itemType = structType->GetMemberType(i); + MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here"); + blockStructItems.emplace_back( + structType->GetMemberName(i), + NewBlockType(itemType, TBlockType::EShape::Many) + ); + } + blockStructItems.emplace_back( + NYql::BlockLengthColumnName, + NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar) + ); + return NewStructType(blockStructItems); +} + +TRuntimeNode TProgramBuilder::ListToBlocks(TRuntimeNode list) { + if constexpr (RuntimeVersion < 60U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(list.GetStaticType()->IsList(), "Expected List as input type"); + const auto listType = AS_TYPE(TListType, list.GetStaticType()); + + MKQL_ENSURE(listType->GetItemType()->IsStruct(), "Expected List of Struct as input type"); + const auto itemStructType = AS_TYPE(TStructType, listType->GetItemType()); + + const auto itemBlockStructType = BuildBlockStructType(itemStructType); + + TCallableBuilder callableBuilder(Env, __func__, NewListType(itemBlockStructType)); + callableBuilder.Add(list); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType()); auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType()); diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h index 01c02232933..fb0b097dfaa 100644 --- a/yql/essentials/minikql/mkql_program_builder.h +++ b/yql/essentials/minikql/mkql_program_builder.h @@ -241,6 +241,7 @@ public: TRuntimeNode ToBlocks(TRuntimeNode flow); TRuntimeNode WideToBlocks(TRuntimeNode flow); + TRuntimeNode ListToBlocks(TRuntimeNode list); TRuntimeNode FromBlocks(TRuntimeNode flow); TRuntimeNode WideFromBlocks(TRuntimeNode flow); TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count); @@ -861,6 +862,7 @@ private: TType* ChooseCommonType(TType* type1, TType* type2); TType* BuildArithmeticCommonType(TType* type1, TType* type2); TType* BuildWideBlockType(const TArrayRef<TType* const>& wideComponents); + TType* BuildBlockStructType(const TStructType* structType); bool IsNull(TRuntimeNode arg); protected: diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h index d0dc62b9390..df026ed9015 100644 --- a/yql/essentials/minikql/mkql_runtime_version.h +++ b/yql/essentials/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 59U +#define MKQL_RUNTIME_VERSION 60U #endif // History: |