aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-12-20 18:13:56 +0300
committeraneporada <aneporada@ydb.tech>2022-12-20 18:13:56 +0300
commitf37198778abfc9966a20b45117657a92c31d0e17 (patch)
tree717d17623fc3119dea8172f4652c3bd83ad040c5
parent1e90d439ad65bba680872176ad4ad3b3f87224d1 (diff)
downloadydb-f37198778abfc9966a20b45117657a92c31d0e17.tar.gz
Initial support for strings in blocks: BlockReader/BlockBuilder
-rw-r--r--ydb/library/yql/ast/yql_expr.h22
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_wide.cpp16
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp22
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp462
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h27
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp216
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h22
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp549
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp6
13 files changed, 886 insertions, 468 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h
index 7ec95914503..6c77bbb30f9 100644
--- a/ydb/library/yql/ast/yql_expr.h
+++ b/ydb/library/yql/ast/yql_expr.h
@@ -132,7 +132,8 @@ enum ETypeAnnotationFlags : ui32 {
TypeHasManyValues = 0x200,
TypeHasBareYson = 0x400,
TypeHasNestedOptional = 0x800,
- TypeNonPresortable = 0x1000
+ TypeNonPresortable = 0x1000,
+ TypeHasDynamicSize = 0x2000,
};
const ui64 TypeHashMagic = 0x10000;
@@ -235,6 +236,15 @@ public:
return kind == ETypeAnnotationKind::Optional || kind == ETypeAnnotationKind::Null || kind == ETypeAnnotationKind::Pg;
}
+ bool IsAnyBlockOrScalar() const {
+ auto kind = GetKind();
+ return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::ChunkedBlock || kind == ETypeAnnotationKind::Scalar;
+ }
+
+ bool HasFixedSizeRepr() const {
+ return (GetFlags() & (TypeHasDynamicSize | TypeNonPersistable | TypeNonComputable)) == 0;
+ }
+
bool IsSingleton() const {
return (GetFlags() & TypeHasManyValues) == 0;
}
@@ -551,7 +561,7 @@ public:
static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::List;
TListExprType(ui64 hash, const TTypeAnnotationNode* itemType)
- : TTypeAnnotationNode(KindValue, itemType->GetFlags(), hash)
+ : TTypeAnnotationNode(KindValue, itemType->GetFlags() | TypeHasDynamicSize, hash)
, ItemType(itemType)
{
}
@@ -737,6 +747,10 @@ public:
ret |= TypeHasBareYson;
}
+ if (props & NUdf::StringType) {
+ ret |= TypeHasDynamicSize;
+ }
+
return ret;
}
@@ -801,6 +815,7 @@ class TPgExprType : public TTypeAnnotationNode {
public:
static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Pg;
+ // TODO: TypeHasDynamicSize for Pg types
TPgExprType(ui64 hash, ui32 typeId)
: TTypeAnnotationNode(KindValue, GetFlags(typeId), hash)
, TypeId(typeId)
@@ -953,7 +968,8 @@ public:
static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Dict;
TDictExprType(ui64 hash, const TTypeAnnotationNode* keyType, const TTypeAnnotationNode* payloadType)
- : TTypeAnnotationNode(KindValue, TypeNonComparable | keyType->GetFlags() | payloadType->GetFlags(), hash)
+ : TTypeAnnotationNode(KindValue, TypeNonComparable | TypeHasDynamicSize |
+ keyType->GetFlags() | payloadType->GetFlags(), hash)
, KeyType(keyType)
, PayloadType(payloadType)
{
diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
index 6e51ff57c79..a60ed79c840 100644
--- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
@@ -638,12 +638,21 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
TTypeAnnotationNode::TListType retMultiType;
for (const auto& type : multiType->GetItems()) {
- if (type->GetKind() == ETypeAnnotationKind::Block || type->GetKind() == ETypeAnnotationKind::Scalar) {
+ if (type->IsAnyBlockOrScalar()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar"));
return IGraphTransformer::TStatus::Error;
}
- retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type));
+ if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (type->HasFixedSizeRepr()) {
+ retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type));
+ } else {
+ retMultiType.push_back(ctx.Expr.MakeType<TChunkedBlockExprType>(type));
+ }
+
}
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
@@ -659,7 +668,8 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T
}
TTypeAnnotationNode::TListType retMultiType;
- if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
+ const bool allowChunked = true;
+ if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr, allowChunked)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 008dcc700e9..07688adebf0 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -2697,7 +2697,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ
return true;
}
-bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx) {
+bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked) {
if (!EnsureWideFlowType(node, ctx)) {
return false;
}
@@ -2710,7 +2710,7 @@ bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListTy
bool isScalar;
for (const auto& type : items) {
- if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) {
+ if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx, allowChunked)) {
return false;
}
@@ -5328,18 +5328,21 @@ bool HasContextFuncs(const TExprNode& input) {
return needCtx;
}
-bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx) {
+bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked) {
if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) {
YQL_ENSURE(node.Type() == TExprNode::Lambda);
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected block or scalar type, but got lambda"));
return false;
}
- return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx);
+ return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx, allowChunked);
}
-bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) {
- if (HasError(&type, ctx) || (type.GetKind() != ETypeAnnotationKind::Block && type.GetKind() != ETypeAnnotationKind::Scalar)) {
+bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked) {
+ bool valid = allowChunked ? type.IsAnyBlockOrScalar() :
+ (type.GetKind() == ETypeAnnotationKind::Block ||
+ type.GetKind() == ETypeAnnotationKind::Scalar);
+ if (HasError(&type, ctx) || !valid) {
ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected block or scalar type, but got: " << type));
return false;
}
@@ -5348,11 +5351,14 @@ bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode
}
const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar) {
- auto kind = type.GetKind();
- YQL_ENSURE(kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::Scalar);
+ YQL_ENSURE(type.IsAnyBlockOrScalar());
+ const auto kind = type.GetKind();
if (kind == ETypeAnnotationKind::Block) {
isScalar = false;
return type.Cast<TBlockExprType>()->GetItemType();
+ } else if (kind == ETypeAnnotationKind::ChunkedBlock) {
+ isScalar = false;
+ return type.Cast<TChunkedBlockExprType>()->GetItemType();
} else {
isScalar = true;
return type.Cast<TScalarExprType>()->GetItemType();
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index e6a19bb7238..9a0fa776425 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -118,7 +118,7 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
-bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx);
+bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked = false);
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx);
bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureType(const TExprNode& node, TExprContext& ctx);
@@ -297,8 +297,8 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType);
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx);
bool HasContextFuncs(const TExprNode& input);
-bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx);
-bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
+bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked = false);
+bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked = false);
const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar);
const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx);
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
index bac7a8573ad..2e6b5588c39 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
@@ -40,10 +40,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
index 30b1ee690cc..303a6c6b5e3 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
@@ -41,10 +41,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
index 30b1ee690cc..303a6c6b5e3 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
@@ -41,10 +41,12 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_logical.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
new file mode 100644
index 00000000000..958f421f2b3
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -0,0 +1,462 @@
+#include "mkql_block_builder.h"
+
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+
+#include <arrow/array/builder_primitive.h>
+#include <arrow/chunked_array.h>
+
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+bool AlwaysUseChunks(const TType* type) {
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ if (AlwaysUseChunks(tupleType->GetElementType(i))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ return (GetDataTypeInfo(slot).Features & NYql::NUdf::EDataTypeFeatures::StringType) != 0u;
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
+// size of each block item in bytes
+size_t CalcItemSize(const TType* type) {
+ // we do not count block bitmap size
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ size_t result = 0;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ result += CalcItemSize(tupleType->GetElementType(i));
+ }
+ return result;
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Int16:
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ case NUdf::EDataSlot::Int32:
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ case NUdf::EDataSlot::Float:
+ case NUdf::EDataSlot::Double: {
+ size_t sz = GetDataTypeInfo(slot).FixedSize;
+ MKQL_ENSURE(sz > 0, "Unexpected fixed data size");
+ return sz;
+ }
+ case NUdf::EDataSlot::String:
+ // size of offset part
+ return sizeof(arrow::BinaryType::offset_type);
+ case NUdf::EDataSlot::Utf8:
+ // size of offset part
+ return sizeof(arrow::StringType::offset_type);
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
+std::shared_ptr<arrow::DataType> GetArrowType(TType* type) {
+ std::shared_ptr<arrow::DataType> result;
+ bool isOptional;
+ Y_VERIFY(ConvertArrowType(type, isOptional, result));
+ return result;
+}
+
+class TBlockBuilderBase : public IBlockBuilder {
+public:
+ using Ptr = std::unique_ptr<TBlockBuilderBase>;
+
+ struct TBlockArrayTree {
+ using Ptr = std::shared_ptr<TBlockArrayTree>;
+ std::deque<std::shared_ptr<arrow::ArrayData>> Payload;
+ std::vector<TBlockArrayTree::Ptr> Children;
+ };
+
+
+ TBlockBuilderBase(TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : Type(type)
+ , Pool(&pool)
+ , MaxLen(maxLen)
+ {
+ Y_VERIFY(type);
+ Y_VERIFY(maxLen > 0);
+ }
+
+ size_t MaxLength() const final {
+ return MaxLen;
+ }
+
+ void Add(const NUdf::TUnboxedValue& value) final {
+ Y_VERIFY(CurrLen < MaxLen);
+ DoAdd(value);
+ CurrLen++;
+ }
+
+ NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) final {
+ auto tree = BuildTree(finish);
+ CurrLen = 0;
+ arrow::ArrayVector chunks;
+ while (size_t size = CalcSliceSize(*tree)) {
+ std::shared_ptr<arrow::ArrayData> data = Slice(*tree, size);
+ chunks.push_back(arrow::Datum(data).make_array());
+ }
+
+ Y_VERIFY(!chunks.empty());
+
+ if (chunks.size() > 1 || AlwaysUseChunks(Type)) {
+ auto chunked = ARROW_RESULT(arrow::ChunkedArray::Make(std::move(chunks), GetArrowType(Type)));
+ return ctx.HolderFactory.CreateArrowBlock(std::move(chunked));
+ }
+ return ctx.HolderFactory.CreateArrowBlock(chunks.front());
+ }
+
+ virtual TBlockArrayTree::Ptr BuildTree(bool finish) = 0;
+ virtual void DoAdd(const NUdf::TUnboxedValue& value) = 0;
+
+private:
+ static size_t CalcSliceSize(const TBlockArrayTree& tree) {
+ if (tree.Payload.empty()) {
+ return 0;
+ }
+
+ if (!tree.Children.empty()) {
+ Y_VERIFY(tree.Payload.size() == 1);
+ size_t result = std::numeric_limits<size_t>::max();
+ for (auto& child : tree.Children) {
+ size_t childSize = CalcSliceSize(*child);
+ result = std::min(result, childSize);
+ }
+ Y_VERIFY(result <= tree.Payload.front()->length);
+ return result;
+ }
+
+ int64_t result = std::numeric_limits<int64_t>::max();
+ for (auto& data : tree.Payload) {
+ result = std::min(result, data->length);
+ }
+
+ Y_VERIFY(result > 0);
+ return static_cast<size_t>(result);
+ }
+
+ static std::shared_ptr<arrow::ArrayData> Slice(TBlockArrayTree& tree, size_t size) {
+ Y_VERIFY(size > 0);
+
+ Y_VERIFY(!tree.Payload.empty());
+ auto& main = tree.Payload.front();
+ std::shared_ptr<arrow::ArrayData> sliced;
+ if (size == main->length) {
+ sliced = main;
+ tree.Payload.pop_front();
+ } else {
+ Y_VERIFY(size < main->length);
+ sliced = main->Slice(0, size);
+ main = main->Slice(size, main->length - size);
+ }
+
+ if (!tree.Children.empty()) {
+ std::vector<std::shared_ptr<arrow::ArrayData>> children;
+ for (auto& child : tree.Children) {
+ children.push_back(Slice(*child, size));
+ }
+
+ sliced->child_data = std::move(children);
+ if (tree.Payload.empty()) {
+ tree.Children.clear();
+ }
+ }
+ return sliced;
+ }
+
+protected:
+ TType* const Type;
+ arrow::MemoryPool* const Pool;
+ const size_t MaxLen;
+private:
+ size_t CurrLen = 0;
+};
+
+template <typename T, typename TBuilder>
+class TFixedSizeBlockBuilder : public TBlockBuilderBase {
+public:
+ TFixedSizeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : TBlockBuilderBase(type, pool, maxLen)
+ , Builder(std::make_unique<TBuilder>(&pool))
+ {
+ Reserve();
+ }
+
+ void DoAdd(const NUdf::TUnboxedValue& value) final {
+ if (value) {
+ Builder->UnsafeAppend(value.Get<T>());
+ } else {
+ Builder->UnsafeAppendNull();
+ }
+ }
+
+ TBlockArrayTree::Ptr BuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ result->Payload.emplace_back();
+ ARROW_OK(Builder->FinishInternal(&result->Payload.back()));
+ Builder.reset();
+ if (!finish) {
+ Builder = std::make_unique<TBuilder>(Pool);
+ Reserve();
+ }
+ return result;
+ }
+
+private:
+ void Reserve() {
+ ARROW_OK(Builder->Reserve(MaxLen));
+ }
+
+ std::unique_ptr<TBuilder> Builder;
+};
+
+template<typename TStringType>
+class TStringBlockBuilder : public TBlockBuilderBase {
+public:
+ TStringBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : TBlockBuilderBase(type, pool, maxLen)
+ {
+ Reserve();
+ }
+
+ void DoAdd(const NUdf::TUnboxedValue& value) final {
+ if (!value) {
+ NullBitmapBuilder->UnsafeAppend(false);
+ AppendCurrentOffset();
+ NullCount++;
+ return;
+ }
+
+ const TStringBuf str = value.AsStringRef();
+
+ size_t currentLen = DataBuilder->length();
+ Y_VERIFY(currentLen <= MaxBlockSizeInBytes);
+ if (str.size() > (MaxBlockSizeInBytes - currentLen)) {
+ MKQL_ENSURE(str.size() < std::numeric_limits<typename TStringType::offset_type>::max(), "Too big string for Arrow");
+ FlushChunk(false);
+ }
+
+ NullBitmapBuilder->UnsafeAppend(true);
+ AppendCurrentOffset();
+ DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
+ }
+
+ TBlockArrayTree::Ptr BuildTree(bool finish) final {
+ FlushChunk(finish);
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ result->Payload = std::move(Chunks);
+ Chunks.clear();
+ return result;
+ }
+
+private:
+ void Reserve() {
+ NullBitmapBuilder = std::make_unique<arrow::TypedBufferBuilder<bool>>(Pool);
+ OffsetsBuilder = std::make_unique<arrow::TypedBufferBuilder<typename TStringType::offset_type>>(Pool);
+ DataBuilder = std::make_unique<arrow::TypedBufferBuilder<ui8>>(Pool);
+ ARROW_OK(NullBitmapBuilder->Reserve(MaxLen));
+ ARROW_OK(OffsetsBuilder->Reserve(MaxLen + 1));
+ ARROW_OK(DataBuilder->Reserve(MaxBlockSizeInBytes));
+ NullCount = 0;
+ }
+
+ void AppendCurrentOffset() {
+ OffsetsBuilder->UnsafeAppend(DataBuilder->length());
+ }
+
+ void FlushChunk(bool finish) {
+ const auto length = NullBitmapBuilder->length();
+ Y_VERIFY(length > 0);
+
+ AppendCurrentOffset();
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ std::shared_ptr<arrow::Buffer> offsets;
+ std::shared_ptr<arrow::Buffer> data;
+ if (NullCount) {
+ ARROW_OK(NullBitmapBuilder->Finish(&nullBitmap));
+ }
+ ARROW_OK(OffsetsBuilder->Finish(&offsets));
+ ARROW_OK(DataBuilder->Finish(&data));
+ auto arrowType = std::make_shared<TStringType>();
+ Chunks.push_back(arrow::ArrayData::Make(std::make_shared<TStringType>(), length,
+ { nullBitmap, offsets, data }, NullCount, 0));
+ if (!finish) {
+ Reserve();
+ }
+ }
+
+ std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder;
+ std::unique_ptr<arrow::TypedBufferBuilder<typename TStringType::offset_type>> OffsetsBuilder;
+ std::unique_ptr<arrow::TypedBufferBuilder<ui8>> DataBuilder;
+ std::deque<std::shared_ptr<arrow::ArrayData>> Chunks;
+ size_t NullCount = 0;
+};
+
+class TTupleBlockBuilder : public TBlockBuilderBase {
+public:
+ TTupleBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen,
+ TVector<TBlockBuilderBase::Ptr>&& children)
+ : TBlockBuilderBase(type, pool, maxLen)
+ , Children(std::move(children))
+ {
+ Reserve();
+ }
+
+ void DoAdd(const NUdf::TUnboxedValue& value) final {
+ auto tupleType = AS_TYPE(TTupleType, Type);
+ if (!value) {
+ NullBitmapBuilder->UnsafeAppend(false);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ Children[i]->DoAdd({});
+ }
+ return;
+ }
+
+ NullBitmapBuilder->UnsafeAppend(true);
+ auto elements = value.GetElements();
+ if (elements) {
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ Children[i]->DoAdd(elements[i]);
+ }
+ } else {
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ Children[i]->DoAdd(value.GetElement(i));
+ }
+ }
+ }
+
+ TBlockArrayTree::Ptr BuildTree(bool finish) final {
+ auto tupleType = AS_TYPE(TTupleType, Type);
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ auto length = NullBitmapBuilder->length();
+ ARROW_OK(NullBitmapBuilder->Finish(&nullBitmap));
+ result->Payload.push_back(arrow::ArrayData::Make(GetArrowType(Type), length, { nullBitmap }));
+ result->Children.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ result->Children.emplace_back(Children[i]->BuildTree(finish));
+ }
+
+ if (!finish) {
+ Reserve();
+ }
+
+ return result;
+ }
+
+private:
+ void Reserve() {
+ NullBitmapBuilder = std::make_unique<arrow::TypedBufferBuilder<bool>>(Pool);
+ ARROW_OK(NullBitmapBuilder->Reserve(MaxLen));
+ }
+
+private:
+ TVector<std::unique_ptr<TBlockBuilderBase>> Children;
+ std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder;
+};
+
+std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool, size_t maxLen) {
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ TVector<std::unique_ptr<TBlockBuilderBase>> children;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ children.emplace_back(MakeBlockBuilder(tupleType->GetElementType(i), pool, maxLen));
+ }
+
+ return std::make_unique<TTupleBlockBuilder>(tupleType, pool, maxLen, std::move(children));
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(type, pool, maxLen);
+ case NUdf::EDataSlot::String:
+ return std::make_unique<TStringBlockBuilder<arrow::BinaryType>>(type, pool, maxLen);
+ case NUdf::EDataSlot::Utf8:
+ return std::make_unique<TStringBlockBuilder<arrow::StringType>>(type, pool, maxLen);
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
+} // namespace
+
+std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool) {
+ const auto itemSize = std::max<size_t>(CalcItemSize(type), 1);
+ const auto maxLen = std::max<size_t>(MaxBlockSizeInBytes / itemSize, 1);
+ return MakeBlockBuilder(type, pool, maxLen);
+}
+
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
new file mode 100644
index 00000000000..fb64a619e06
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+#include <util/generic/size_literals.h>
+
+#include <limits>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+constexpr size_t MaxBlockSizeInBytes = 1_MB;
+static_assert(MaxBlockSizeInBytes < (size_t)std::numeric_limits<i32>::max());
+
+class IBlockBuilder {
+public:
+ virtual ~IBlockBuilder() = default;
+ virtual size_t MaxLength() const = 0;
+ virtual void Add(const NUdf::TUnboxedValue& value) = 0;
+ virtual NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) = 0;
+};
+
+std::unique_ptr<IBlockBuilder> MakeBlockBuilder(TType* type, arrow::MemoryPool& pool);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
new file mode 100644
index 00000000000..a39c38d2cd9
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
@@ -0,0 +1,216 @@
+#include "mkql_block_reader.h"
+
+#include <ydb/library/yql/minikql/mkql_string_util.h>
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+#include <arrow/array/array_binary.h>
+#include <arrow/chunked_array.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TBlockReaderBase : public IBlockReader {
+public:
+ using Ptr = std::unique_ptr<TBlockReaderBase>;
+
+ void Reset(const arrow::Datum& datum) final {
+ Index = 0;
+ ArrayValues.clear();
+ if (datum.is_scalar()) {
+ ScalarValue = GetScalar(*datum.scalar());
+ } else {
+ Y_VERIFY(datum.is_arraylike());
+ ScalarValue = {};
+ for (auto& arr : datum.chunks()) {
+ ArrayValues.push_back(arr->data());
+ }
+ }
+ }
+
+ TMaybe<NUdf::TUnboxedValuePod> GetNextValue() final {
+ if (ScalarValue.Defined()) {
+ return ScalarValue;
+ }
+
+ TMaybe<NUdf::TUnboxedValuePod> result;
+ while (!ArrayValues.empty()) {
+ if (Index < ArrayValues.front()->length) {
+ result = Get(*ArrayValues.front(), Index++);
+ break;
+ }
+ ArrayValues.pop_front();
+ Index = 0;
+ }
+ return result;
+ }
+
+ virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0;
+ virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0;
+
+private:
+ std::deque<std::shared_ptr<arrow::ArrayData>> ArrayValues;
+ TMaybe<NUdf::TUnboxedValuePod> ScalarValue;
+ size_t Index = 0;
+};
+
+template <typename T>
+class TFixedSizeBlockReader : public TBlockReaderBase {
+public:
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
+
+ return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]);
+ }
+
+ NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ }
+};
+
+class TStringBlockReader : public TBlockReaderBase {
+public:
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ Y_VERIFY_DEBUG(data.buffers.size() == 3);
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
+
+ arrow::util::string_view result;
+ if (data.type->id() == arrow::Type::BINARY) {
+ arrow::BinaryArray arr(std::make_shared<arrow::ArrayData>(data));
+ result = arr.GetView(index);
+ } else {
+ Y_VERIFY(data.type->id() == arrow::Type::STRING);
+ arrow::StringArray arr(std::make_shared<arrow::ArrayData>(data));
+ result = arr.GetView(index);
+ }
+
+ return MakeString(NUdf::TStringRef(result.data(), result.size()));
+ }
+
+ NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
+ return MakeString(NUdf::TStringRef(reinterpret_cast<const char*>(buffer->data()), buffer->size()));
+ }
+};
+
+class TTupleBlockReader : public TBlockReaderBase {
+public:
+ TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory)
+ : Children(std::move(children))
+ , HolderFactory(holderFactory)
+ {}
+
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
+
+ NUdf::TUnboxedValue* items;
+ auto result = Cache.NewArray(HolderFactory, Children.size(), items);
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ items[i] = Children[i]->Get(*data.child_data[i], index);
+ }
+
+ return result;
+ }
+
+ NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+
+ NUdf::TUnboxedValue* items;
+ auto result = Cache.NewArray(HolderFactory, Children.size(), items);
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ items[i] = Children[i]->GetScalar(*structScalar.value[i]);
+ }
+
+ return result;
+ }
+
+private:
+ TVector<std::unique_ptr<TBlockReaderBase>> Children;
+ const THolderFactory& HolderFactory;
+ TPlainContainerCache Cache;
+};
+
+std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) {
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ TVector<std::unique_ptr<TBlockReaderBase>> children;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ children.emplace_back(MakeBlockReaderBase(tupleType->GetElementType(i), holderFactory));
+ }
+
+ return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory);
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockReader<i8>>();
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TFixedSizeBlockReader<ui8>>();
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockReader<i16>>();
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TFixedSizeBlockReader<ui16>>();
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockReader<i32>>();
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TFixedSizeBlockReader<ui32>>();
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TFixedSizeBlockReader<i64>>();
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TFixedSizeBlockReader<ui64>>();
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<TFixedSizeBlockReader<float>>();
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TFixedSizeBlockReader<double>>();
+ case NUdf::EDataSlot::String:
+ case NUdf::EDataSlot::Utf8:
+ return std::make_unique<TStringBlockReader>();
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
+
+} // namespace
+
+std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory) {
+ return MakeBlockReaderBase(type, holderFactory);
+}
+
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
new file mode 100644
index 00000000000..f4eb297770e
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+
+#include <arrow/datum.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+class IBlockReader {
+public:
+ virtual ~IBlockReader() = default;
+ virtual void Reset(const arrow::Datum& datum) = 0;
+ // for scalars will continuously return same value
+ virtual TMaybe<NUdf::TUnboxedValuePod> GetNextValue() = 0;
+};
+
+std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 96d82c583f7..daa5613e68e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -1,4 +1,6 @@
#include "mkql_blocks.h"
+#include "mkql_block_builder.h"
+#include "mkql_block_reader.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
@@ -6,253 +8,47 @@
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
-#include <arrow/array/builder_primitive.h>
-#include <arrow/util/bitmap.h>
-#include <arrow/util/bit_util.h>
-
-#include <util/generic/size_literals.h>
+#include <arrow/scalar.h>
namespace NKikimr {
namespace NMiniKQL {
namespace {
-constexpr size_t MaxBlockSizeInBytes = 1_MB;
-
-class TBlockBuilderBase {
+class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> {
public:
- TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength)
- : Ctx_(ctx)
+ explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType)
+ : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
+ , Flow_(flow)
, ItemType_(itemType)
- , MaxLength_(maxLength)
- {}
-
- virtual ~TBlockBuilderBase() = default;
-
- inline size_t MaxLength() const noexcept {
- return MaxLength_;
- }
-
- virtual void Add(const NUdf::TUnboxedValue& value) = 0;
- virtual NUdf::TUnboxedValuePod Build(bool finish) = 0;
-
-protected:
- TComputationContext& Ctx_;
- const std::shared_ptr<arrow::DataType> ItemType_;
- const size_t MaxLength_;
-};
-
-template <typename T, typename TBuilder>
-class TFixedSizeBlockBuilder : public TBlockBuilderBase {
-public:
- TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
- : TBlockBuilderBase(ctx, itemType, MaxBlockSizeInBytes / TypeSize(*itemType))
- , Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool))
- {
- this->Reserve();
- }
-
- void Add(const NUdf::TUnboxedValue& value) override {
- Y_VERIFY_DEBUG(Builder_->length() < MaxLength_);
- if (value) {
- this->Builder_->UnsafeAppend(value.Get<T>());
- } else {
- this->Builder_->UnsafeAppendNull();
- }
- }
-
- NUdf::TUnboxedValuePod Build(bool finish) override {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(this->Builder_->FinishInternal(&result));
- Builder_.reset();
- if (!finish) {
- Builder_ = std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool);
- Reserve();
- }
-
- return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result));
- }
-
-private:
- void Reserve() {
- ARROW_OK(this->Builder_->Reserve(MaxLength_));
- }
-
- static int64_t TypeSize(arrow::DataType& itemType) {
- const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width();
- return arrow::BitUtil::BytesForBits(bits);
- }
-
-private:
- std::unique_ptr<TBuilder> Builder_;
-};
-
-class TTupleBlockBuilder : public TBlockBuilderBase {
-public:
- TTupleBlockBuilder(TComputationContext & ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength,
- TTupleType* tupleType, TVector<std::unique_ptr<TBlockBuilderBase>>&& children)
- : TBlockBuilderBase(ctx, itemType, maxLength)
- , TupleType_(tupleType)
- , Children_(std::move(children))
{
- bool isOptional;
- MKQL_ENSURE(ConvertArrowType(TupleType_, isOptional, ArrowType_), "Unsupported type");
- Reserve();
}
- void Add(const NUdf::TUnboxedValue& value) final {
- if (!value) {
- NullBitmapBuilder_->UnsafeAppend(false);
- for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- Children_[i]->Add({});
- }
-
- return;
- }
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto builder = MakeBlockBuilder(ItemType_, ctx.ArrowMemoryPool);
- NullBitmapBuilder_->UnsafeAppend(true);
- auto elements = value.GetElements();
- if (elements) {
- for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- Children_[i]->Add(elements[i]);
- }
- } else {
- for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- Children_[i]->Add(value.GetElement(i));
+ for (size_t i = 0; i < builder->MaxLength(); ++i) {
+ auto result = Flow_->GetValue(ctx);
+ if (result.IsFinish() || result.IsYield()) {
+ if (i == 0) {
+ return result.Release();
+ }
+ break;
}
- }
- }
-
- NUdf::TUnboxedValuePod Build(bool finish) final {
- std::vector<arrow::Datum> childrenValues;
- childrenValues.reserve(TupleType_->GetElementsCount());
- for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- childrenValues.emplace_back(TArrowBlock::From(Children_[i]->Build(finish)).GetDatum());
+ builder->Add(result);
}
- std::shared_ptr<arrow::Buffer> nullBitmap;
- auto length = NullBitmapBuilder_->length();
- auto nullCount = NullBitmapBuilder_->false_count();
- ARROW_OK(NullBitmapBuilder_->Finish(&nullBitmap));
- auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0);
- for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
- arrayData->child_data.push_back(childrenValues[i].array());
- }
-
- arrow::Datum result(arrayData);
-
- NullBitmapBuilder_ = nullptr;
- if (!finish) {
- Reserve();
- }
-
- return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result));
+ return builder->Build(ctx, true);
}
private:
- void Reserve() {
- NullBitmapBuilder_ = std::make_unique<arrow::TypedBufferBuilder<bool>>(&Ctx_.ArrowMemoryPool);
- ARROW_OK(NullBitmapBuilder_->Reserve(MaxLength_));
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
}
private:
- TTupleType* TupleType_;
- std::shared_ptr<arrow::DataType> ArrowType_;
- TVector<std::unique_ptr<TBlockBuilderBase>> Children_;
- std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder_;
-};
-
-std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, TType* type) {
- if (type->IsOptional()) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
- }
-
- if (type->IsTuple()) {
- bool isOptional;
- std::shared_ptr<arrow::DataType> arrowType;
- MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type");
-
- auto tupleType = AS_TYPE(TTupleType, type);
- TVector<std::unique_ptr<TBlockBuilderBase>> children;
- size_t maxLength = MaxBlockSizeInBytes;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockBuilder(ctx, tupleType->GetElementType(i)));
- maxLength = Min(maxLength, children.back()->MaxLength());
- }
-
- return std::make_unique<TTupleBlockBuilder>(ctx, arrowType, maxLength, tupleType, std::move(children));
- }
-
- if (type->IsData()) {
- auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8());
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16());
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32());
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32());
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64());
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64());
- case NUdf::EDataSlot::Float:
- return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(ctx, arrow::float32());
- case NUdf::EDataSlot::Double:
- return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(ctx, arrow::float64());
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
- }
-
- MKQL_ENSURE(false, "Unsupported type");
-}
-
-class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> {
-public:
- explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType)
- : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
- , Flow_(flow)
- , ItemType_(itemType)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto builder = MakeBlockBuilder(ctx, ItemType_);
-
- for (size_t i = 0; i < builder->MaxLength(); ++i) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish() || result.IsYield()) {
- if (i == 0) {
- return result.Release();
- }
- break;
- }
- builder->Add(result);
- }
-
- return builder->Build(true);
- }
-
-private:
- void RegisterDependencies() const final {
- FlowDependsOn(Flow_);
- }
-
-private:
- IComputationNode* const Flow_;
- TType* ItemType_;
+ IComputationNode* const Flow_;
+ TType* ItemType_;
};
class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
@@ -297,7 +93,7 @@ public:
for (size_t i = 0; i < Width_; ++i) {
if (auto* out = output[i]; out != nullptr) {
- *out = s.Builders_[i]->Build(s.IsFinished_);
+ *out = s.Builders_[i]->Build(ctx, s.IsFinished_);
}
}
@@ -313,7 +109,7 @@ private:
struct TState : public TComputationValue<TState> {
std::vector<NUdf::TUnboxedValue> Values_;
std::vector<NUdf::TUnboxedValue*> ValuePointers_;
- std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_;
+ std::vector<std::unique_ptr<IBlockBuilder>> Builders_;
size_t MaxLength_;
size_t Rows_ = 0;
bool IsFinished_ = false;
@@ -325,7 +121,7 @@ private:
{
for (size_t i = 0; i < types.size(); ++i) {
ValuePointers_[i] = &Values_[i];
- Builders_.push_back(MakeBlockBuilder(ctx, types[i]));
+ Builders_.push_back(MakeBlockBuilder(types[i], ctx.ArrowMemoryPool));
}
MaxLength_ = MaxBlockSizeInBytes;
@@ -353,209 +149,77 @@ private:
const size_t Width_;
};
-class TBlockReaderBase {
+class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
public:
- virtual ~TBlockReaderBase() = default;
+ TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType)
+ : TMutableComputationNode(mutables)
+ , Flow_(flow)
+ , ItemType_(itemType)
+ , StateIndex_(mutables.CurValueIndex++)
+ {
+ }
- virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0;
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto& state = GetState(ctx);
- virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0;
-};
+ for (;;) {
+ auto item = state.GetValue();
+ if (item) {
+ return *item;
+ }
-template <typename T>
-class TFixedSizeBlockReader : public TBlockReaderBase {
-public:
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
- return {};
- }
-
- return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]);
- }
+ auto input = Flow_->GetValue(ctx);
+ if (input.IsFinish()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (input.IsYield()) {
+ return NUdf::TUnboxedValue::MakeYield();
+ }
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
+ state.Reset(TArrowBlock::From(input).GetDatum());
}
-
- return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
}
-};
-class TTupleBlockReader : public TBlockReaderBase {
-public:
- TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory)
- : Children_(std::move(children))
- , HolderFactory_(holderFactory)
- {}
-
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
- return {};
- }
+private:
+ struct TState : public TComputationValue<TState> {
+ using TComputationValue::TComputationValue;
- NUdf::TUnboxedValue* items;
- auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items);
- for (ui32 i = 0; i < Children_.size(); ++i) {
- items[i] = Children_[i]->Get(*data.child_data[i], index);
+ TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ , Reader_(MakeBlockReader(itemType, ctx.HolderFactory))
+ {
}
- return result;
- }
-
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
- if (!scalar.is_valid) {
- return {};
+ TMaybe<NUdf::TUnboxedValuePod> GetValue() const {
+ return Reader_->GetNextValue();
}
- const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
-
- NUdf::TUnboxedValue* items;
- auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items);
- for (ui32 i = 0; i < Children_.size(); ++i) {
- items[i] = Children_[i]->GetScalar(*structScalar.value[i]);
+ void Reset(const arrow::Datum& datum) const {
+ MKQL_ENSURE(datum.is_arraylike(), "Expecting array as FromBlocks argument");
+ Reader_->Reset(datum);
}
- return result;
- }
+ private:
+ const std::unique_ptr<IBlockReader> Reader_;
+ };
private:
- TVector<std::unique_ptr<TBlockReaderBase>> Children_;
- const THolderFactory& HolderFactory_;
- TPlainContainerCache Cache_;
-};
-
-std::unique_ptr<TBlockReaderBase> MakeBlockReader(TType* type, const THolderFactory& holderFactory) {
- if (type->IsOptional()) {
- type = AS_TYPE(TOptionalType, type)->GetItemType();
- }
-
- if (type->IsTuple()) {
- auto tupleType = AS_TYPE(TTupleType, type);
- TVector<std::unique_ptr<TBlockReaderBase>> children;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockReader(tupleType->GetElementType(i), holderFactory));
- }
-
- return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory);
+ void RegisterDependencies() const final {
+ this->DependsOn(Flow_);
}
- if (type->IsData()) {
- auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockReader<i8>>();
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TFixedSizeBlockReader<ui8>>();
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockReader<i16>>();
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockReader<ui16>>();
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockReader<i32>>();
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockReader<ui32>>();
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockReader<i64>>();
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockReader<ui64>>();
- case NUdf::EDataSlot::Float:
- return std::make_unique<TFixedSizeBlockReader<float>>();
- case NUdf::EDataSlot::Double:
- return std::make_unique<TFixedSizeBlockReader<double>>();
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
+ TState& GetState(TComputationContext& ctx) const {
+ auto& result = ctx.MutableValues[StateIndex_];
+ if (!result.HasValue()) {
+ result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
}
+ return *static_cast<TState*>(result.AsBoxed().Get());
}
- MKQL_ENSURE(false, "Unsupported type");
-}
-
-class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
-public:
- TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType)
- : TMutableComputationNode(mutables)
- , Flow_(flow)
- , ItemType_(itemType)
- , StateIndex_(mutables.CurValueIndex++)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto& state = GetState(ctx);
-
- if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish()) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
- if (result.IsYield()) {
- return NUdf::TUnboxedValue::MakeYield();
- }
- state.Array_ = TArrowBlock::From(result).GetDatum().array();
- state.Index_ = 0;
- }
-
- const auto result = state.GetValue();
- ++state.Index_;
- return result;
- }
-
-private:
- struct TState : public TComputationValue<TState> {
- using TComputationValue::TComputationValue;
-
- TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
- : TComputationValue(memInfo)
- {
- Reader_ = MakeBlockReader(itemType, ctx.HolderFactory);
- }
-
- NUdf::TUnboxedValuePod GetValue() const {
- const auto nullCount = Array_->GetNullCount();
-
- return nullCount == Array_->length || (nullCount > 0 && !HasValue())
- ? NUdf::TUnboxedValuePod()
- : DoGetValue();
- }
-
- private:
- NUdf::TUnboxedValuePod DoGetValue() const {
- return Reader_->Get(*Array_, Index_);
- }
-
- bool HasValue() const {
- return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset);
- }
-
- std::unique_ptr<TBlockReaderBase> Reader_;
- public:
- std::shared_ptr<arrow::ArrayData> Array_{ nullptr };
- size_t Index_{ 0 };
- };
-
-private:
- void RegisterDependencies() const final {
- this->DependsOn(Flow_);
- }
-
- TState& GetState(TComputationContext& ctx) const {
- auto& result = ctx.MutableValues[StateIndex_];
- if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
- }
- return *static_cast<TState*>(result.AsBoxed().Get());
- }
-
-private:
- IComputationNode* const Flow_;
- TType* ItemType_;
- const ui32 StateIndex_;
+private:
+ IComputationNode* const Flow_;
+ TType* ItemType_;
+ const ui32 StateIndex_;
};
class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> {
@@ -575,12 +239,7 @@ public:
NUdf::TUnboxedValue*const* output) const
{
auto& s = GetState(state, ctx);
- while (s.Index_ == s.Count_) {
- for (size_t i = 0; i < Width_; ++i) {
- s.Arrays_[i] = nullptr;
- s.Scalars_[i] = nullptr;
- }
-
+ if (s.Index_ == s.Count_) {
auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
if (result != EFetchResult::One) {
return result;
@@ -589,11 +248,7 @@ public:
s.Index_ = 0;
for (size_t i = 0; i < Width_; ++i) {
const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
- if (datum.is_scalar()) {
- s.Scalars_[i] = datum.scalar();
- } else {
- s.Arrays_[i] = datum.array();
- }
+ s.Readers_[i]->Reset(datum);
}
s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
@@ -604,22 +259,9 @@ public:
continue;
}
- const auto& array = s.Arrays_[i];
- if (array) {
- const auto nullCount = array->GetNullCount();
- if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0, 0), s.Index_ + array->offset))) {
- *(output[i]) = NUdf::TUnboxedValue();
- } else {
- *(output[i]) = s.Readers_[i]->Get(*array, s.Index_);
- }
- } else {
- const auto& scalar = s.Scalars_[i];
- if (!scalar->is_valid) {
- *(output[i]) = NUdf::TUnboxedValue();
- } else {
- *(output[i]) = s.Readers_[i]->GetScalar(*scalar);
- }
- }
+ auto result = s.Readers_[i]->GetNextValue();
+ Y_VERIFY(result);
+ *(output[i]) = *result;
}
++s.Index_;
@@ -630,9 +272,7 @@ private:
struct TState : public TComputationValue<TState> {
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<std::shared_ptr<arrow::ArrayData>> Arrays_;
- TVector<std::shared_ptr<arrow::Scalar>> Scalars_;
- TVector<std::unique_ptr<TBlockReaderBase>> Readers_;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
size_t Count_ = 0;
size_t Index_ = 0;
@@ -640,8 +280,6 @@ private:
: TComputationValue(memInfo)
, Values_(types.size() + 1)
, ValuePointers_(types.size() + 1)
- , Arrays_(types.size())
- , Scalars_(types.size())
{
for (size_t i = 0; i < types.size() + 1; ++i) {
ValuePointers_[i] = &Values_[i];
@@ -685,7 +323,7 @@ public:
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto value = Arg_->GetValue(ctx);
- arrow::Datum result = ConvertScalar(Type_, value);
+ arrow::Datum result = ConvertScalar(Type_, value, ctx);
return ctx.HolderFactory.CreateArrowBlock(std::move(result));
}
@@ -694,7 +332,7 @@ private:
DependsOn(Arg_);
}
- arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value) const {
+ arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, TComputationContext& ctx) const {
if (!value) {
bool isOptional;
std::shared_ptr<arrow::DataType> arrowType;
@@ -714,7 +352,7 @@ private:
std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i)).scalar());
+ arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i), ctx).scalar());
}
return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
@@ -748,6 +386,15 @@ private:
return arrow::Datum(static_cast<float>(value.Get<float>()));
case NUdf::EDataSlot::Double:
return arrow::Datum(static_cast<double>(value.Get<double>()));
+ case NUdf::EDataSlot::String:
+ case NUdf::EDataSlot::Utf8: {
+ const auto& str = value.AsStringRef();
+ std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &ctx.ArrowMemoryPool)));
+ std::memcpy(buffer->mutable_data(), str.Data(), str.Size());
+ auto type = (slot == NUdf::EDataSlot::String) ? arrow::binary() : arrow::utf8();
+ std::shared_ptr<arrow::Scalar> scalar = std::make_shared<arrow::BinaryScalar>(buffer, type);
+ return arrow::Datum(scalar);
+ }
default:
MKQL_ENSURE(false, "Unsupported data slot");
}
@@ -761,13 +408,13 @@ private:
TType* Type_;
};
-}
+} // namespace
-IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType());
-}
+IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType());
+}
IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index b04bc43ec35..dba908a7069 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1350,6 +1350,12 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
case NUdf::EDataSlot::Double:
type = arrow::float64();
return true;
+ case NUdf::EDataSlot::String:
+ type = arrow::binary();
+ return true;
+ case NUdf::EDataSlot::Utf8:
+ type = arrow::utf8();
+ return true;
default:
return false;
}