diff options
author | aneporada <aneporada@ydb.tech> | 2022-12-20 18:13:56 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2022-12-20 18:13:56 +0300 |
commit | f37198778abfc9966a20b45117657a92c31d0e17 (patch) | |
tree | 717d17623fc3119dea8172f4652c3bd83ad040c5 | |
parent | 1e90d439ad65bba680872176ad4ad3b3f87224d1 (diff) | |
download | ydb-f37198778abfc9966a20b45117657a92c31d0e17.tar.gz |
Initial support for strings in blocks: BlockReader/BlockBuilder
-rw-r--r-- | ydb/library/yql/ast/yql_expr.h | 22 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_wide.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 22 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp | 462 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h | 27 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp | 216 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h | 22 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp | 549 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_type_builder.cpp | 6 |
13 files changed, 886 insertions, 468 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h index 7ec95914503..6c77bbb30f9 100644 --- a/ydb/library/yql/ast/yql_expr.h +++ b/ydb/library/yql/ast/yql_expr.h @@ -132,7 +132,8 @@ enum ETypeAnnotationFlags : ui32 { TypeHasManyValues = 0x200, TypeHasBareYson = 0x400, TypeHasNestedOptional = 0x800, - TypeNonPresortable = 0x1000 + TypeNonPresortable = 0x1000, + TypeHasDynamicSize = 0x2000, }; const ui64 TypeHashMagic = 0x10000; @@ -235,6 +236,15 @@ public: return kind == ETypeAnnotationKind::Optional || kind == ETypeAnnotationKind::Null || kind == ETypeAnnotationKind::Pg; } + bool IsAnyBlockOrScalar() const { + auto kind = GetKind(); + return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::ChunkedBlock || kind == ETypeAnnotationKind::Scalar; + } + + bool HasFixedSizeRepr() const { + return (GetFlags() & (TypeHasDynamicSize | TypeNonPersistable | TypeNonComputable)) == 0; + } + bool IsSingleton() const { return (GetFlags() & TypeHasManyValues) == 0; } @@ -551,7 +561,7 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::List; TListExprType(ui64 hash, const TTypeAnnotationNode* itemType) - : TTypeAnnotationNode(KindValue, itemType->GetFlags(), hash) + : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeHasDynamicSize, hash) , ItemType(itemType) { } @@ -737,6 +747,10 @@ public: ret |= TypeHasBareYson; } + if (props & NUdf::StringType) { + ret |= TypeHasDynamicSize; + } + return ret; } @@ -801,6 +815,7 @@ class TPgExprType : public TTypeAnnotationNode { public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Pg; + // TODO: TypeHasDynamicSize for Pg types TPgExprType(ui64 hash, ui32 typeId) : TTypeAnnotationNode(KindValue, GetFlags(typeId), hash) , TypeId(typeId) @@ -953,7 +968,8 @@ public: static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Dict; TDictExprType(ui64 hash, const TTypeAnnotationNode* keyType, const TTypeAnnotationNode* payloadType) - : TTypeAnnotationNode(KindValue, TypeNonComparable | keyType->GetFlags() | payloadType->GetFlags(), hash) + : TTypeAnnotationNode(KindValue, TypeNonComparable | TypeHasDynamicSize | + keyType->GetFlags() | payloadType->GetFlags(), hash) , KeyType(keyType) , PayloadType(payloadType) { diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp index 6e51ff57c79..a60ed79c840 100644 --- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp @@ -638,12 +638,21 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); TTypeAnnotationNode::TListType retMultiType; for (const auto& type : multiType->GetItems()) { - if (type->GetKind() == ETypeAnnotationKind::Block || type->GetKind() == ETypeAnnotationKind::Scalar) { + if (type->IsAnyBlockOrScalar()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar")); return IGraphTransformer::TStatus::Error; } - retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type)); + if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (type->HasFixedSizeRepr()) { + retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type)); + } else { + retMultiType.push_back(ctx.Expr.MakeType<TChunkedBlockExprType>(type)); + } + } retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); @@ -659,7 +668,8 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T } TTypeAnnotationNode::TListType retMultiType; - if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { + const bool allowChunked = true; + if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr, allowChunked)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 008dcc700e9..07688adebf0 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2697,7 +2697,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ return true; } -bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx) { +bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked) { if (!EnsureWideFlowType(node, ctx)) { return false; } @@ -2710,7 +2710,7 @@ bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListTy bool isScalar; for (const auto& type : items) { - if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) { + if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx, allowChunked)) { return false; } @@ -5328,18 +5328,21 @@ bool HasContextFuncs(const TExprNode& input) { return needCtx; } -bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx) { +bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked) { if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) { YQL_ENSURE(node.Type() == TExprNode::Lambda); ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected block or scalar type, but got lambda")); return false; } - return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx); + return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx, allowChunked); } -bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) { - if (HasError(&type, ctx) || (type.GetKind() != ETypeAnnotationKind::Block && type.GetKind() != ETypeAnnotationKind::Scalar)) { +bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked) { + bool valid = allowChunked ? type.IsAnyBlockOrScalar() : + (type.GetKind() == ETypeAnnotationKind::Block || + type.GetKind() == ETypeAnnotationKind::Scalar); + if (HasError(&type, ctx) || !valid) { ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected block or scalar type, but got: " << type)); return false; } @@ -5348,11 +5351,14 @@ bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode } const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar) { - auto kind = type.GetKind(); - YQL_ENSURE(kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::Scalar); + YQL_ENSURE(type.IsAnyBlockOrScalar()); + const auto kind = type.GetKind(); if (kind == ETypeAnnotationKind::Block) { isScalar = false; return type.Cast<TBlockExprType>()->GetItemType(); + } else if (kind == ETypeAnnotationKind::ChunkedBlock) { + isScalar = false; + return type.Cast<TChunkedBlockExprType>()->GetItemType(); } else { isScalar = true; return type.Cast<TScalarExprType>()->GetItemType(); diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index e6a19bb7238..9a0fa776425 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -118,7 +118,7 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); -bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx); +bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked = false); bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx); bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureType(const TExprNode& node, TExprContext& ctx); @@ -297,8 +297,8 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType); bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx); bool HasContextFuncs(const TExprNode& input); -bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx); -bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); +bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked = false); +bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked = false); const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar); const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx); diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt index bac7a8573ad..2e6b5588c39 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt @@ -40,10 +40,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt index 30b1ee690cc..303a6c6b5e3 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt @@ -41,10 +41,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt index 30b1ee690cc..303a6c6b5e3 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt @@ -41,10 +41,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp new file mode 100644 index 00000000000..958f421f2b3 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -0,0 +1,462 @@ +#include "mkql_block_builder.h" + +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> + +#include <arrow/array/builder_primitive.h> +#include <arrow/chunked_array.h> + + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +bool AlwaysUseChunks(const TType* type) { + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + if (AlwaysUseChunks(tupleType->GetElementType(i))) { + return true; + } + } + return false; + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + return (GetDataTypeInfo(slot).Features & NYql::NUdf::EDataTypeFeatures::StringType) != 0u; + } + + MKQL_ENSURE(false, "Unsupported type"); +} + +// size of each block item in bytes +size_t CalcItemSize(const TType* type) { + // we do not count block bitmap size + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + size_t result = 0; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + result += CalcItemSize(tupleType->GetElementType(i)); + } + return result; + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + case NUdf::EDataSlot::Uint8: + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Int16: + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + case NUdf::EDataSlot::Int32: + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + case NUdf::EDataSlot::Float: + case NUdf::EDataSlot::Double: { + size_t sz = GetDataTypeInfo(slot).FixedSize; + MKQL_ENSURE(sz > 0, "Unexpected fixed data size"); + return sz; + } + case NUdf::EDataSlot::String: + // size of offset part + return sizeof(arrow::BinaryType::offset_type); + case NUdf::EDataSlot::Utf8: + // size of offset part + return sizeof(arrow::StringType::offset_type); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + } + + MKQL_ENSURE(false, "Unsupported type"); +} + +std::shared_ptr<arrow::DataType> GetArrowType(TType* type) { + std::shared_ptr<arrow::DataType> result; + bool isOptional; + Y_VERIFY(ConvertArrowType(type, isOptional, result)); + return result; +} + +class TBlockBuilderBase : public IBlockBuilder { +public: + using Ptr = std::unique_ptr<TBlockBuilderBase>; + + struct TBlockArrayTree { + using Ptr = std::shared_ptr<TBlockArrayTree>; + std::deque<std::shared_ptr<arrow::ArrayData>> Payload; + std::vector<TBlockArrayTree::Ptr> Children; + }; + + + TBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxLen) + : Type(type) + , Pool(&pool) + , MaxLen(maxLen) + { + Y_VERIFY(type); + Y_VERIFY(maxLen > 0); + } + + size_t MaxLength() const final { + return MaxLen; + } + + void Add(const NUdf::TUnboxedValue& value) final { + Y_VERIFY(CurrLen < MaxLen); + DoAdd(value); + CurrLen++; + } + + NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) final { + auto tree = BuildTree(finish); + CurrLen = 0; + arrow::ArrayVector chunks; + while (size_t size = CalcSliceSize(*tree)) { + std::shared_ptr<arrow::ArrayData> data = Slice(*tree, size); + chunks.push_back(arrow::Datum(data).make_array()); + } + + Y_VERIFY(!chunks.empty()); + + if (chunks.size() > 1 || AlwaysUseChunks(Type)) { + auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(chunks), GetArrowType(Type))); + return ctx.HolderFactory.CreateArrowBlock(std::move(chunked)); + } + return ctx.HolderFactory.CreateArrowBlock(chunks.front()); + } + + virtual TBlockArrayTree::Ptr BuildTree(bool finish) = 0; + virtual void DoAdd(const NUdf::TUnboxedValue& value) = 0; + +private: + static size_t CalcSliceSize(const TBlockArrayTree& tree) { + if (tree.Payload.empty()) { + return 0; + } + + if (!tree.Children.empty()) { + Y_VERIFY(tree.Payload.size() == 1); + size_t result = std::numeric_limits<size_t>::max(); + for (auto& child : tree.Children) { + size_t childSize = CalcSliceSize(*child); + result = std::min(result, childSize); + } + Y_VERIFY(result <= tree.Payload.front()->length); + return result; + } + + int64_t result = std::numeric_limits<int64_t>::max(); + for (auto& data : tree.Payload) { + result = std::min(result, data->length); + } + + Y_VERIFY(result > 0); + return static_cast<size_t>(result); + } + + static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) { + Y_VERIFY(size > 0); + + Y_VERIFY(!tree.Payload.empty()); + auto& main = tree.Payload.front(); + std::shared_ptr<arrow::ArrayData> sliced; + if (size == main->length) { + sliced = main; + tree.Payload.pop_front(); + } else { + Y_VERIFY(size < main->length); + sliced = main->Slice(0, size); + main = main->Slice(size, main->length - size); + } + + if (!tree.Children.empty()) { + std::vector<std::shared_ptr<arrow::ArrayData>> children; + for (auto& child : tree.Children) { + children.push_back(Slice(*child, size)); + } + + sliced->child_data = std::move(children); + if (tree.Payload.empty()) { + tree.Children.clear(); + } + } + return sliced; + } + +protected: + TType* const Type; + arrow::MemoryPool* const Pool; + const size_t MaxLen; +private: + size_t CurrLen = 0; +}; + +template <typename T, typename TBuilder> +class TFixedSizeBlockBuilder : public TBlockBuilderBase { +public: + TFixedSizeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TBlockBuilderBase(type, pool, maxLen) + , Builder(std::make_unique<TBuilder>(&pool)) + { + Reserve(); + } + + void DoAdd(const NUdf::TUnboxedValue& value) final { + if (value) { + Builder->UnsafeAppend(value.Get<T>()); + } else { + Builder->UnsafeAppendNull(); + } + } + + TBlockArrayTree::Ptr BuildTree(bool finish) final { + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + result->Payload.emplace_back(); + ARROW_OK(Builder->FinishInternal(&result->Payload.back())); + Builder.reset(); + if (!finish) { + Builder = std::make_unique<TBuilder>(Pool); + Reserve(); + } + return result; + } + +private: + void Reserve() { + ARROW_OK(Builder->Reserve(MaxLen)); + } + + std::unique_ptr<TBuilder> Builder; +}; + +template<typename TStringType> +class TStringBlockBuilder : public TBlockBuilderBase { +public: + TStringBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TBlockBuilderBase(type, pool, maxLen) + { + Reserve(); + } + + void DoAdd(const NUdf::TUnboxedValue& value) final { + if (!value) { + NullBitmapBuilder->UnsafeAppend(false); + AppendCurrentOffset(); + NullCount++; + return; + } + + const TStringBuf str = value.AsStringRef(); + + size_t currentLen = DataBuilder->length(); + Y_VERIFY(currentLen <= MaxBlockSizeInBytes); + if (str.size() > (MaxBlockSizeInBytes - currentLen)) { + MKQL_ENSURE(str.size() < std::numeric_limits<typename TStringType::offset_type>::max(), "Too big string for Arrow"); + FlushChunk(false); + } + + NullBitmapBuilder->UnsafeAppend(true); + AppendCurrentOffset(); + DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size()); + } + + TBlockArrayTree::Ptr BuildTree(bool finish) final { + FlushChunk(finish); + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + result->Payload = std::move(Chunks); + Chunks.clear(); + return result; + } + +private: + void Reserve() { + NullBitmapBuilder = std::make_unique<arrow::TypedBufferBuilder<bool>>(Pool); + OffsetsBuilder = std::make_unique<arrow::TypedBufferBuilder<typename TStringType::offset_type>>(Pool); + DataBuilder = std::make_unique<arrow::TypedBufferBuilder<ui8>>(Pool); + ARROW_OK(NullBitmapBuilder->Reserve(MaxLen)); + ARROW_OK(OffsetsBuilder->Reserve(MaxLen + 1)); + ARROW_OK(DataBuilder->Reserve(MaxBlockSizeInBytes)); + NullCount = 0; + } + + void AppendCurrentOffset() { + OffsetsBuilder->UnsafeAppend(DataBuilder->length()); + } + + void FlushChunk(bool finish) { + const auto length = NullBitmapBuilder->length(); + Y_VERIFY(length > 0); + + AppendCurrentOffset(); + std::shared_ptr<arrow::Buffer> nullBitmap; + std::shared_ptr<arrow::Buffer> offsets; + std::shared_ptr<arrow::Buffer> data; + if (NullCount) { + ARROW_OK(NullBitmapBuilder->Finish(&nullBitmap)); + } + ARROW_OK(OffsetsBuilder->Finish(&offsets)); + ARROW_OK(DataBuilder->Finish(&data)); + auto arrowType = std::make_shared<TStringType>(); + Chunks.push_back(arrow::ArrayData::Make(std::make_shared<TStringType>(), length, + { nullBitmap, offsets, data }, NullCount, 0)); + if (!finish) { + Reserve(); + } + } + + std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder; + std::unique_ptr<arrow::TypedBufferBuilder<typename TStringType::offset_type>> OffsetsBuilder; + std::unique_ptr<arrow::TypedBufferBuilder<ui8>> DataBuilder; + std::deque<std::shared_ptr<arrow::ArrayData>> Chunks; + size_t NullCount = 0; +}; + +class TTupleBlockBuilder : public TBlockBuilderBase { +public: + TTupleBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen, + TVector<TBlockBuilderBase::Ptr>&& children) + : TBlockBuilderBase(type, pool, maxLen) + , Children(std::move(children)) + { + Reserve(); + } + + void DoAdd(const NUdf::TUnboxedValue& value) final { + auto tupleType = AS_TYPE(TTupleType, Type); + if (!value) { + NullBitmapBuilder->UnsafeAppend(false); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + Children[i]->DoAdd({}); + } + return; + } + + NullBitmapBuilder->UnsafeAppend(true); + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + Children[i]->DoAdd(elements[i]); + } + } else { + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + Children[i]->DoAdd(value.GetElement(i)); + } + } + } + + TBlockArrayTree::Ptr BuildTree(bool finish) final { + auto tupleType = AS_TYPE(TTupleType, Type); + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + + std::shared_ptr<arrow::Buffer> nullBitmap; + auto length = NullBitmapBuilder->length(); + ARROW_OK(NullBitmapBuilder->Finish(&nullBitmap)); + result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap })); + result->Children.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + result->Children.emplace_back(Children[i]->BuildTree(finish)); + } + + if (!finish) { + Reserve(); + } + + return result; + } + +private: + void Reserve() { + NullBitmapBuilder = std::make_unique<arrow::TypedBufferBuilder<bool>>(Pool); + ARROW_OK(NullBitmapBuilder->Reserve(MaxLen)); + } + +private: + TVector<std::unique_ptr<TBlockBuilderBase>> Children; + std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder; +}; + +std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) { + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + TVector<std::unique_ptr<TBlockBuilderBase>> children; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + children.emplace_back(MakeBlockBuilder(tupleType->GetElementType(i), pool, maxLen)); + } + + return std::make_unique<TTupleBlockBuilder>(tupleType, pool, maxLen, std::move(children)); + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Uint8: + case NUdf::EDataSlot::Bool: + return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(type, pool, maxLen); + case NUdf::EDataSlot::Float: + return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(type, pool, maxLen); + case NUdf::EDataSlot::Double: + return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(type, pool, maxLen); + case NUdf::EDataSlot::String: + return std::make_unique<TStringBlockBuilder<arrow::BinaryType>>(type, pool, maxLen); + case NUdf::EDataSlot::Utf8: + return std::make_unique<TStringBlockBuilder<arrow::StringType>>(type, pool, maxLen); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + } + + MKQL_ENSURE(false, "Unsupported type"); +} + +} // namespace + +std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool) { + const auto itemSize = std::max<size_t>(CalcItemSize(type), 1); + const auto maxLen = std::max<size_t>(MaxBlockSizeInBytes / itemSize, 1); + return MakeBlockBuilder(type, pool, maxLen); +} + + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h new file mode 100644 index 00000000000..fb64a619e06 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h @@ -0,0 +1,27 @@ +#pragma once + +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +#include <util/generic/size_literals.h> + +#include <limits> + +namespace NKikimr { +namespace NMiniKQL { + +constexpr size_t MaxBlockSizeInBytes = 1_MB; +static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max()); + +class IBlockBuilder { +public: + virtual ~IBlockBuilder() = default; + virtual size_t MaxLength() const = 0; + virtual void Add(const NUdf::TUnboxedValue& value) = 0; + virtual NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) = 0; +}; + +std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp new file mode 100644 index 00000000000..a39c38d2cd9 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp @@ -0,0 +1,216 @@ +#include "mkql_block_reader.h" + +#include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/minikql/mkql_node_builder.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> + +#include <arrow/array/array_binary.h> +#include <arrow/chunked_array.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TBlockReaderBase : public IBlockReader { +public: + using Ptr = std::unique_ptr<TBlockReaderBase>; + + void Reset(const arrow::Datum& datum) final { + Index = 0; + ArrayValues.clear(); + if (datum.is_scalar()) { + ScalarValue = GetScalar(*datum.scalar()); + } else { + Y_VERIFY(datum.is_arraylike()); + ScalarValue = {}; + for (auto& arr : datum.chunks()) { + ArrayValues.push_back(arr->data()); + } + } + } + + TMaybe<NUdf::TUnboxedValuePod> GetNextValue() final { + if (ScalarValue.Defined()) { + return ScalarValue; + } + + TMaybe<NUdf::TUnboxedValuePod> result; + while (!ArrayValues.empty()) { + if (Index < ArrayValues.front()->length) { + result = Get(*ArrayValues.front(), Index++); + break; + } + ArrayValues.pop_front(); + Index = 0; + } + return result; + } + + virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0; + virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0; + +private: + std::deque<std::shared_ptr<arrow::ArrayData>> ArrayValues; + TMaybe<NUdf::TUnboxedValuePod> ScalarValue; + size_t Index = 0; +}; + +template <typename T> +class TFixedSizeBlockReader : public TBlockReaderBase { +public: + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } + + return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]); + } + + NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); + } +}; + +class TStringBlockReader : public TBlockReaderBase { +public: + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + Y_VERIFY_DEBUG(data.buffers.size() == 3); + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } + + arrow::util::string_view result; + if (data.type->id() == arrow::Type::BINARY) { + arrow::BinaryArray arr(std::make_shared<arrow::ArrayData>(data)); + result = arr.GetView(index); + } else { + Y_VERIFY(data.type->id() == arrow::Type::STRING); + arrow::StringArray arr(std::make_shared<arrow::ArrayData>(data)); + result = arr.GetView(index); + } + + return MakeString(NUdf::TStringRef(result.data(), result.size())); + } + + NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; + return MakeString(NUdf::TStringRef(reinterpret_cast<const char*>(buffer->data()), buffer->size())); + } +}; + +class TTupleBlockReader : public TBlockReaderBase { +public: + TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory) + : Children(std::move(children)) + , HolderFactory(holderFactory) + {} + + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } + + NUdf::TUnboxedValue* items; + auto result = Cache.NewArray(HolderFactory, Children.size(), items); + for (ui32 i = 0; i < Children.size(); ++i) { + items[i] = Children[i]->Get(*data.child_data[i], index); + } + + return result; + } + + NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + + NUdf::TUnboxedValue* items; + auto result = Cache.NewArray(HolderFactory, Children.size(), items); + for (ui32 i = 0; i < Children.size(); ++i) { + items[i] = Children[i]->GetScalar(*structScalar.value[i]); + } + + return result; + } + +private: + TVector<std::unique_ptr<TBlockReaderBase>> Children; + const THolderFactory& HolderFactory; + TPlainContainerCache Cache; +}; + +std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) { + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + TVector<std::unique_ptr<TBlockReaderBase>> children; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + children.emplace_back(MakeBlockReaderBase(tupleType->GetElementType(i), holderFactory)); + } + + return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory); + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockReader<i8>>(); + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + return std::make_unique<TFixedSizeBlockReader<ui8>>(); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockReader<i16>>(); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<TFixedSizeBlockReader<ui16>>(); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockReader<i32>>(); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<TFixedSizeBlockReader<ui32>>(); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<TFixedSizeBlockReader<i64>>(); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<TFixedSizeBlockReader<ui64>>(); + case NUdf::EDataSlot::Float: + return std::make_unique<TFixedSizeBlockReader<float>>(); + case NUdf::EDataSlot::Double: + return std::make_unique<TFixedSizeBlockReader<double>>(); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return std::make_unique<TStringBlockReader>(); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + } + + MKQL_ENSURE(false, "Unsupported type"); +} + + +} // namespace + +std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory) { + return MakeBlockReaderBase(type, holderFactory); +} + + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h new file mode 100644 index 00000000000..f4eb297770e --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h @@ -0,0 +1,22 @@ +#pragma once + +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node.h> + +#include <arrow/datum.h> + +namespace NKikimr { +namespace NMiniKQL { + +class IBlockReader { +public: + virtual ~IBlockReader() = default; + virtual void Reset(const arrow::Datum& datum) = 0; + // for scalars will continuously return same value + virtual TMaybe<NUdf::TUnboxedValuePod> GetNextValue() = 0; +}; + +std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 96d82c583f7..daa5613e68e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,4 +1,6 @@ #include "mkql_blocks.h" +#include "mkql_block_builder.h" +#include "mkql_block_reader.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> @@ -6,253 +8,47 @@ #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> -#include <arrow/array/builder_primitive.h> -#include <arrow/util/bitmap.h> -#include <arrow/util/bit_util.h> - -#include <util/generic/size_literals.h> +#include <arrow/scalar.h> namespace NKikimr { namespace NMiniKQL { namespace { -constexpr size_t MaxBlockSizeInBytes = 1_MB; - -class TBlockBuilderBase { +class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> { public: - TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength) - : Ctx_(ctx) + explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType) + : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed) + , Flow_(flow) , ItemType_(itemType) - , MaxLength_(maxLength) - {} - - virtual ~TBlockBuilderBase() = default; - - inline size_t MaxLength() const noexcept { - return MaxLength_; - } - - virtual void Add(const NUdf::TUnboxedValue& value) = 0; - virtual NUdf::TUnboxedValuePod Build(bool finish) = 0; - -protected: - TComputationContext& Ctx_; - const std::shared_ptr<arrow::DataType> ItemType_; - const size_t MaxLength_; -}; - -template <typename T, typename TBuilder> -class TFixedSizeBlockBuilder : public TBlockBuilderBase { -public: - TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) - : TBlockBuilderBase(ctx, itemType, MaxBlockSizeInBytes / TypeSize(*itemType)) - , Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool)) - { - this->Reserve(); - } - - void Add(const NUdf::TUnboxedValue& value) override { - Y_VERIFY_DEBUG(Builder_->length() < MaxLength_); - if (value) { - this->Builder_->UnsafeAppend(value.Get<T>()); - } else { - this->Builder_->UnsafeAppendNull(); - } - } - - NUdf::TUnboxedValuePod Build(bool finish) override { - std::shared_ptr<arrow::ArrayData> result; - ARROW_OK(this->Builder_->FinishInternal(&result)); - Builder_.reset(); - if (!finish) { - Builder_ = std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool); - Reserve(); - } - - return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result)); - } - -private: - void Reserve() { - ARROW_OK(this->Builder_->Reserve(MaxLength_)); - } - - static int64_t TypeSize(arrow::DataType& itemType) { - const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width(); - return arrow::BitUtil::BytesForBits(bits); - } - -private: - std::unique_ptr<TBuilder> Builder_; -}; - -class TTupleBlockBuilder : public TBlockBuilderBase { -public: - TTupleBlockBuilder(TComputationContext & ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength, - TTupleType* tupleType, TVector<std::unique_ptr<TBlockBuilderBase>>&& children) - : TBlockBuilderBase(ctx, itemType, maxLength) - , TupleType_(tupleType) - , Children_(std::move(children)) { - bool isOptional; - MKQL_ENSURE(ConvertArrowType(TupleType_, isOptional, ArrowType_), "Unsupported type"); - Reserve(); } - void Add(const NUdf::TUnboxedValue& value) final { - if (!value) { - NullBitmapBuilder_->UnsafeAppend(false); - for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { - Children_[i]->Add({}); - } - - return; - } + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto builder = MakeBlockBuilder(ItemType_, ctx.ArrowMemoryPool); - NullBitmapBuilder_->UnsafeAppend(true); - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { - Children_[i]->Add(elements[i]); - } - } else { - for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { - Children_[i]->Add(value.GetElement(i)); + for (size_t i = 0; i < builder->MaxLength(); ++i) { + auto result = Flow_->GetValue(ctx); + if (result.IsFinish() || result.IsYield()) { + if (i == 0) { + return result.Release(); + } + break; } - } - } - - NUdf::TUnboxedValuePod Build(bool finish) final { - std::vector<arrow::Datum> childrenValues; - childrenValues.reserve(TupleType_->GetElementsCount()); - for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { - childrenValues.emplace_back(TArrowBlock::From(Children_[i]->Build(finish)).GetDatum()); + builder->Add(result); } - std::shared_ptr<arrow::Buffer> nullBitmap; - auto length = NullBitmapBuilder_->length(); - auto nullCount = NullBitmapBuilder_->false_count(); - ARROW_OK(NullBitmapBuilder_->Finish(&nullBitmap)); - auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0); - for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- arrayData->child_data.push_back(childrenValues[i].array());
- }
- - arrow::Datum result(arrayData); - - NullBitmapBuilder_ = nullptr; - if (!finish) { - Reserve(); - } - - return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result)); + return builder->Build(ctx, true); } private: - void Reserve() { - NullBitmapBuilder_ = std::make_unique<arrow::TypedBufferBuilder<bool>>(&Ctx_.ArrowMemoryPool); - ARROW_OK(NullBitmapBuilder_->Reserve(MaxLength_)); + void RegisterDependencies() const final { + FlowDependsOn(Flow_); } private: - TTupleType* TupleType_; - std::shared_ptr<arrow::DataType> ArrowType_; - TVector<std::unique_ptr<TBlockBuilderBase>> Children_; - std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder_; -}; - -std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, TType* type) { - if (type->IsOptional()) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); - } - - if (type->IsTuple()) { - bool isOptional; - std::shared_ptr<arrow::DataType> arrowType; - MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type"); - - auto tupleType = AS_TYPE(TTupleType, type); - TVector<std::unique_ptr<TBlockBuilderBase>> children; - size_t maxLength = MaxBlockSizeInBytes; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - children.emplace_back(MakeBlockBuilder(ctx, tupleType->GetElementType(i))); - maxLength = Min(maxLength, children.back()->MaxLength()); - } - - return std::make_unique<TTupleBlockBuilder>(ctx, arrowType, maxLength, tupleType, std::move(children)); - } - - if (type->IsData()) { - auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8()); - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8()); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16()); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16()); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32()); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32()); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64()); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64()); - case NUdf::EDataSlot::Float: - return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(ctx, arrow::float32()); - case NUdf::EDataSlot::Double: - return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(ctx, arrow::float64()); - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } - } - - MKQL_ENSURE(false, "Unsupported type"); -} - -class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> {
-public:
- explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType)
- : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
- , Flow_(flow)
- , ItemType_(itemType)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto builder = MakeBlockBuilder(ctx, ItemType_);
-
- for (size_t i = 0; i < builder->MaxLength(); ++i) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish() || result.IsYield()) {
- if (i == 0) {
- return result.Release();
- }
- break;
- }
- builder->Add(result);
- }
-
- return builder->Build(true);
- }
-
-private:
- void RegisterDependencies() const final {
- FlowDependsOn(Flow_);
- }
-
-private:
- IComputationNode* const Flow_;
- TType* ItemType_;
+ IComputationNode* const Flow_; + TType* ItemType_; }; class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { @@ -297,7 +93,7 @@ public: for (size_t i = 0; i < Width_; ++i) { if (auto* out = output[i]; out != nullptr) { - *out = s.Builders_[i]->Build(s.IsFinished_); + *out = s.Builders_[i]->Build(ctx, s.IsFinished_); } } @@ -313,7 +109,7 @@ private: struct TState : public TComputationValue<TState> { std::vector<NUdf::TUnboxedValue> Values_; std::vector<NUdf::TUnboxedValue*> ValuePointers_; - std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_; + std::vector<std::unique_ptr<IBlockBuilder>> Builders_; size_t MaxLength_; size_t Rows_ = 0; bool IsFinished_ = false; @@ -325,7 +121,7 @@ private: { for (size_t i = 0; i < types.size(); ++i) { ValuePointers_[i] = &Values_[i]; - Builders_.push_back(MakeBlockBuilder(ctx, types[i])); + Builders_.push_back(MakeBlockBuilder(types[i], ctx.ArrowMemoryPool)); } MaxLength_ = MaxBlockSizeInBytes; @@ -353,209 +149,77 @@ private: const size_t Width_; }; -class TBlockReaderBase { +class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> { public: - virtual ~TBlockReaderBase() = default; + TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType) + : TMutableComputationNode(mutables) + , Flow_(flow) + , ItemType_(itemType) + , StateIndex_(mutables.CurValueIndex++) + { + } - virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0; + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto& state = GetState(ctx); - virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0; -}; + for (;;) { + auto item = state.GetValue(); + if (item) { + return *item; + } -template <typename T> -class TFixedSizeBlockReader : public TBlockReaderBase { -public: - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { - return {}; - } - - return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]); - } + auto input = Flow_->GetValue(ctx); + if (input.IsFinish()) { + return NUdf::TUnboxedValue::MakeFinish(); + } + if (input.IsYield()) { + return NUdf::TUnboxedValue::MakeYield(); + } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; + state.Reset(TArrowBlock::From(input).GetDatum()); } - - return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); } -}; -class TTupleBlockReader : public TBlockReaderBase { -public: - TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory) - : Children_(std::move(children)) - , HolderFactory_(holderFactory) - {} - - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { - return {}; - } +private: + struct TState : public TComputationValue<TState> { + using TComputationValue::TComputationValue; - NUdf::TUnboxedValue* items; - auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items); - for (ui32 i = 0; i < Children_.size(); ++i) { - items[i] = Children_[i]->Get(*data.child_data[i], index); + TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx) + : TComputationValue(memInfo) + , Reader_(MakeBlockReader(itemType, ctx.HolderFactory)) + { } - return result; - } - - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { - if (!scalar.is_valid) { - return {}; + TMaybe<NUdf::TUnboxedValuePod> GetValue() const { + return Reader_->GetNextValue(); } - const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - - NUdf::TUnboxedValue* items; - auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items); - for (ui32 i = 0; i < Children_.size(); ++i) { - items[i] = Children_[i]->GetScalar(*structScalar.value[i]); + void Reset(const arrow::Datum& datum) const { + MKQL_ENSURE(datum.is_arraylike(), "Expecting array as FromBlocks argument"); + Reader_->Reset(datum); } - return result; - } + private: + const std::unique_ptr<IBlockReader> Reader_; + }; private: - TVector<std::unique_ptr<TBlockReaderBase>> Children_; - const THolderFactory& HolderFactory_; - TPlainContainerCache Cache_; -}; - -std::unique_ptr<TBlockReaderBase> MakeBlockReader(TType* type, const THolderFactory& holderFactory) { - if (type->IsOptional()) { - type = AS_TYPE(TOptionalType, type)->GetItemType(); - } - - if (type->IsTuple()) { - auto tupleType = AS_TYPE(TTupleType, type); - TVector<std::unique_ptr<TBlockReaderBase>> children; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReader(tupleType->GetElementType(i), holderFactory)); - } - - return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory); + void RegisterDependencies() const final { + this->DependsOn(Flow_); } - if (type->IsData()) { - auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockReader<i8>>(); - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - return std::make_unique<TFixedSizeBlockReader<ui8>>(); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockReader<i16>>(); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockReader<ui16>>(); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockReader<i32>>(); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockReader<ui32>>(); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockReader<i64>>(); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockReader<ui64>>(); - case NUdf::EDataSlot::Float: - return std::make_unique<TFixedSizeBlockReader<float>>(); - case NUdf::EDataSlot::Double: - return std::make_unique<TFixedSizeBlockReader<double>>(); - default: - MKQL_ENSURE(false, "Unsupported data slot"); + TState& GetState(TComputationContext& ctx) const { + auto& result = ctx.MutableValues[StateIndex_]; + if (!result.HasValue()) { + result = ctx.HolderFactory.Create<TState>(ItemType_, ctx); } + return *static_cast<TState*>(result.AsBoxed().Get()); } - MKQL_ENSURE(false, "Unsupported type"); -} - -class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
-public:
- TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType)
- : TMutableComputationNode(mutables)
- , Flow_(flow)
- , ItemType_(itemType)
- , StateIndex_(mutables.CurValueIndex++)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto& state = GetState(ctx);
-
- if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish()) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
- if (result.IsYield()) {
- return NUdf::TUnboxedValue::MakeYield();
- }
- state.Array_ = TArrowBlock::From(result).GetDatum().array();
- state.Index_ = 0;
- }
-
- const auto result = state.GetValue();
- ++state.Index_;
- return result;
- }
-
-private:
- struct TState : public TComputationValue<TState> {
- using TComputationValue::TComputationValue;
-
- TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
- : TComputationValue(memInfo)
- {
- Reader_ = MakeBlockReader(itemType, ctx.HolderFactory);
- }
-
- NUdf::TUnboxedValuePod GetValue() const {
- const auto nullCount = Array_->GetNullCount();
-
- return nullCount == Array_->length || (nullCount > 0 && !HasValue())
- ? NUdf::TUnboxedValuePod()
- : DoGetValue();
- }
-
- private:
- NUdf::TUnboxedValuePod DoGetValue() const {
- return Reader_->Get(*Array_, Index_);
- }
-
- bool HasValue() const {
- return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset);
- }
-
- std::unique_ptr<TBlockReaderBase> Reader_;
- public:
- std::shared_ptr<arrow::ArrayData> Array_{ nullptr };
- size_t Index_{ 0 };
- };
-
-private:
- void RegisterDependencies() const final {
- this->DependsOn(Flow_);
- }
-
- TState& GetState(TComputationContext& ctx) const {
- auto& result = ctx.MutableValues[StateIndex_];
- if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
- }
- return *static_cast<TState*>(result.AsBoxed().Get());
- }
-
-private:
- IComputationNode* const Flow_;
- TType* ItemType_;
- const ui32 StateIndex_;
+private: + IComputationNode* const Flow_; + TType* ItemType_; + const ui32 StateIndex_; }; class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> { @@ -575,12 +239,7 @@ public: NUdf::TUnboxedValue*const* output) const { auto& s = GetState(state, ctx); - while (s.Index_ == s.Count_) { - for (size_t i = 0; i < Width_; ++i) { - s.Arrays_[i] = nullptr; - s.Scalars_[i] = nullptr; - } - + if (s.Index_ == s.Count_) { auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); if (result != EFetchResult::One) { return result; @@ -589,11 +248,7 @@ public: s.Index_ = 0; for (size_t i = 0; i < Width_; ++i) { const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); - if (datum.is_scalar()) { - s.Scalars_[i] = datum.scalar(); - } else { - s.Arrays_[i] = datum.array(); - } + s.Readers_[i]->Reset(datum); } s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; @@ -604,22 +259,9 @@ public: continue; } - const auto& array = s.Arrays_[i]; - if (array) { - const auto nullCount = array->GetNullCount(); - if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0, 0), s.Index_ + array->offset))) { - *(output[i]) = NUdf::TUnboxedValue(); - } else { - *(output[i]) = s.Readers_[i]->Get(*array, s.Index_); - } - } else { - const auto& scalar = s.Scalars_[i]; - if (!scalar->is_valid) { - *(output[i]) = NUdf::TUnboxedValue(); - } else { - *(output[i]) = s.Readers_[i]->GetScalar(*scalar); - } - } + auto result = s.Readers_[i]->GetNextValue(); + Y_VERIFY(result); + *(output[i]) = *result; } ++s.Index_; @@ -630,9 +272,7 @@ private: struct TState : public TComputationValue<TState> { TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; - TVector<std::shared_ptr<arrow::ArrayData>> Arrays_; - TVector<std::shared_ptr<arrow::Scalar>> Scalars_; - TVector<std::unique_ptr<TBlockReaderBase>> Readers_; + TVector<std::unique_ptr<IBlockReader>> Readers_; size_t Count_ = 0; size_t Index_ = 0; @@ -640,8 +280,6 @@ private: : TComputationValue(memInfo) , Values_(types.size() + 1) , ValuePointers_(types.size() + 1) - , Arrays_(types.size()) - , Scalars_(types.size()) { for (size_t i = 0; i < types.size() + 1; ++i) { ValuePointers_[i] = &Values_[i]; @@ -685,7 +323,7 @@ public: NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto value = Arg_->GetValue(ctx); - arrow::Datum result = ConvertScalar(Type_, value); + arrow::Datum result = ConvertScalar(Type_, value, ctx); return ctx.HolderFactory.CreateArrowBlock(std::move(result)); } @@ -694,7 +332,7 @@ private: DependsOn(Arg_); } - arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value) const { + arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, TComputationContext& ctx) const { if (!value) { bool isOptional; std::shared_ptr<arrow::DataType> arrowType; @@ -714,7 +352,7 @@ private: std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i)).scalar()); + arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i), ctx).scalar()); } return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType)); @@ -748,6 +386,15 @@ private: return arrow::Datum(static_cast<float>(value.Get<float>())); case NUdf::EDataSlot::Double: return arrow::Datum(static_cast<double>(value.Get<double>())); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: { + const auto& str = value.AsStringRef(); + std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &ctx.ArrowMemoryPool))); + std::memcpy(buffer->mutable_data(), str.Data(), str.Size()); + auto type = (slot == NUdf::EDataSlot::String) ? arrow::binary() : arrow::utf8(); + std::shared_ptr<arrow::Scalar> scalar = std::make_shared<arrow::BinaryScalar>(buffer, type); + return arrow::Datum(scalar); + } default: MKQL_ENSURE(false, "Unsupported data slot"); } @@ -761,13 +408,13 @@ private: TType* Type_; }; -} +} // namespace -IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType());
-}
+IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType()); +} IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index b04bc43ec35..dba908a7069 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1350,6 +1350,12 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty case NUdf::EDataSlot::Double: type = arrow::float64(); return true; + case NUdf::EDataSlot::String: + type = arrow::binary(); + return true; + case NUdf::EDataSlot::Utf8: + type = arrow::utf8(); + return true; default: return false; } |