aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-01-20 12:28:13 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-01-20 12:47:42 +0300
commitf7ffd0055251048ccfe299d5e7518f95b4b79bd6 (patch)
treea703380f3dfc98cfa9f170bd02d2124f436f32c0
parentc345883bbb07a2f0e3c4c4fbb75b9e468627e834 (diff)
downloadydb-f7ffd0055251048ccfe299d5e7518f95b4b79bd6.tar.gz
Use type dispatching traits in Arrow array builders
commit_hash:ffdd14a35d07660b4bebf2a6471da83349a37d78
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp2
-rw-r--r--yql/essentials/minikql/computation/mkql_block_reader.cpp4
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.cpp9
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer.cpp6
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp1
-rw-r--r--yql/essentials/minikql/mkql_type_builder.cpp9
-rw-r--r--yql/essentials/public/udf/arrow/block_builder.h234
-rw-r--r--yql/essentials/public/udf/arrow/block_item_comparator.h7
-rw-r--r--yql/essentials/public/udf/arrow/block_reader.h187
-rw-r--r--yql/essentials/public/udf/arrow/dispatch_traits.h237
-rw-r--r--yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp23
-rw-r--r--yql/essentials/public/udf/arrow/ya.make1
-rw-r--r--yt/yql/providers/yt/codec/yt_arrow_converter.cpp8
-rw-r--r--yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp3
15 files changed, 348 insertions, 387 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index d32fec8e89..b9836c0eca 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -97,7 +97,7 @@ constexpr TIn InitialStateValue() {
}
} else if constexpr (std::is_same_v<TIn, NYql::NDecimal::TInt128>) {
if constexpr (IsMin) {
- return NYql::NDecimal::Nan();
+ return NYql::NDecimal::Nan();
} else {
return -NYql::NDecimal::Inf();
}
@@ -131,7 +131,7 @@ class TColumnBuilder : public IAggColumnBuilder {
using TStateType = TState<IsNullable, TIn, IsMin>;
public:
TColumnBuilder(ui64 size, TType* type, TComputationContext& ctx)
- : Builder_(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, size)
+ : Builder_(type, TTypeInfoHelper(), ctx.ArrowMemoryPool, size)
, Ctx_(ctx)
{
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 5b550ecd2f..c3efcd3c84 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -45,7 +45,7 @@ public:
using TStateType = TSumState<IsNullable, TSum>;
TSumColumnBuilder(ui64 size, TType* dataType, TComputationContext& ctx)
- : Builder_(TTypeInfoHelper(), dataType, ctx.ArrowMemoryPool, size)
+ : Builder_(dataType, TTypeInfoHelper(), ctx.ArrowMemoryPool, size)
, Ctx_(ctx)
{
}
diff --git a/yql/essentials/minikql/computation/mkql_block_reader.cpp b/yql/essentials/minikql/computation/mkql_block_reader.cpp
index d718096f70..4e2060e739 100644
--- a/yql/essentials/minikql/computation/mkql_block_reader.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_reader.cpp
@@ -286,6 +286,8 @@ struct TConverterTraits {
template<typename TTzDate, bool Nullable>
using TTzDateConverter = TTzDateBlockItemConverter<TTzDate, Nullable>;
+ constexpr static bool PassType = false;
+
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
if (desc.PassByValue) {
return std::make_unique<TFixedSize<ui64, true>>();
@@ -328,7 +330,7 @@ struct TConverterTraits {
} // namespace
std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const NUdf::IPgBuilder& pgBuilder) {
- return NYql::NUdf::MakeBlockReaderImpl<TConverterTraits>(typeInfoHelper, type, &pgBuilder);
+ return NYql::NUdf::DispatchByArrowTraits<TConverterTraits>(typeInfoHelper, type, &pgBuilder);
}
} // namespace NMiniKQL
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp
index d68029c315..a03a5027e8 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp
@@ -2,7 +2,7 @@
#include "mkql_block_builder.h"
#include <yql/essentials/minikql/mkql_type_builder.h>
-#include <yql/essentials/public/udf/arrow/block_reader.h>
+#include <yql/essentials/public/udf/arrow/dispatch_traits.h>
#include <yql/essentials/public/udf/arrow/memory_pool.h>
#include <yql/essentials/utils/yql_panic.h>
@@ -633,6 +633,7 @@ struct TSerializerTraits {
template<typename TTzDateType, bool Nullable>
using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>;
+ constexpr static bool PassType = false;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -670,6 +671,8 @@ struct TDeserializerTraits {
template<typename TTzDateType, bool Nullable>
using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>;
+ constexpr static bool PassType = false;
+
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
@@ -698,11 +701,11 @@ struct TDeserializerTraits {
std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
- return NYql::NUdf::MakeBlockReaderImpl<TSerializerTraits>(typeInfoHelper, type, nullptr);
+ return NYql::NUdf::DispatchByArrowTraits<TSerializerTraits>(typeInfoHelper, type, nullptr);
}
std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
- std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::MakeBlockReaderImpl<TDeserializerTraits>(typeInfoHelper, type, nullptr);
+ std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr);
result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type));
return std::move(result);
}
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
index 02baba2c23..b53a3890a4 100644
--- a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
@@ -2,8 +2,8 @@
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/public/decimal/yql_decimal.h>
-#include <yql/essentials/public/udf/arrow/block_reader.h>
#include <yql/essentials/public/udf/arrow/defs.h>
+#include <yql/essentials/public/udf/arrow/dispatch_traits.h>
#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
#include <yql/essentials/public/udf/udf_value.h>
@@ -218,6 +218,8 @@ struct TTrimmerTraits {
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>;
+ constexpr static bool PassType = false;
+
static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
@@ -246,7 +248,7 @@ struct TTrimmerTraits {
};
IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) {
- return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool);
+ return DispatchByArrowTraits<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool);
}
}
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
index 4ebb74f3ec..6c2ecb6b16 100644
--- a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
@@ -3,6 +3,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <yql/essentials/public/udf/arrow/block_builder.h>
+#include <yql/essentials/public/udf/arrow/block_reader.h>
#include <yql/essentials/public/udf/arrow/memory_pool.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp
index 1fc58bf87e..d1df31a97d 100644
--- a/yql/essentials/minikql/mkql_type_builder.cpp
+++ b/yql/essentials/minikql/mkql_type_builder.cpp
@@ -6,6 +6,7 @@
#include <yql/essentials/public/udf/udf_type_ops.h>
#include <yql/essentials/public/udf/arrow/block_item_comparator.h>
#include <yql/essentials/public/udf/arrow/block_item_hasher.h>
+#include <yql/essentials/public/udf/arrow/dispatch_traits.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
@@ -2552,6 +2553,8 @@ struct TComparatorTraits {
template <typename T, bool Nullable>
using TTzDateComparator = NUdf::TTzDateBlockItemComparator<T, Nullable>;
+ constexpr static bool PassType = false;
+
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
return std::unique_ptr<TResult>(MakePgItemComparator(desc.TypeId).Release());
@@ -2584,6 +2587,8 @@ struct THasherTraits {
template <typename T, bool Nullable>
using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher<T, Nullable>;
+ constexpr static bool PassType = false;
+
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
return std::unique_ptr<TResult>(MakePgItemHasher(desc.TypeId).Release());
@@ -2605,11 +2610,11 @@ struct THasherTraits {
};
NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const {
- return NUdf::MakeBlockReaderImpl<TComparatorTraits>(TTypeInfoHelper(), type, nullptr).release();
+ return NUdf::DispatchByArrowTraits<TComparatorTraits>(TTypeInfoHelper(), type, nullptr).release();
}
NUdf::IBlockItemHasher::TPtr TBlockTypeHelper::MakeHasher(NUdf::TType* type) const {
- return NUdf::MakeBlockReaderImpl<THasherTraits>(TTypeInfoHelper(), type, nullptr).release();
+ return NUdf::DispatchByArrowTraits<THasherTraits>(TTypeInfoHelper(), type, nullptr).release();
}
TType* TTypeBuilder::NewVoidType() const {
diff --git a/yql/essentials/public/udf/arrow/block_builder.h b/yql/essentials/public/udf/arrow/block_builder.h
index 93d084d2c2..92f4f7e123 100644
--- a/yql/essentials/public/udf/arrow/block_builder.h
+++ b/yql/essentials/public/udf/arrow/block_builder.h
@@ -4,6 +4,7 @@
#include "bit_util.h"
#include "block_io_buffer.h"
#include "block_item.h"
+#include "dispatch_traits.h"
#include <yql/essentials/public/udf/udf_value.h>
#include <yql/essentials/public/udf/udf_value_builder.h>
@@ -525,7 +526,7 @@ public:
: TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
{}
- TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
+ TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
: TBase(typeInfoHelper, type, pool, maxLen, params)
{}
@@ -557,7 +558,7 @@ public:
: TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
{}
- TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
+ TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
: TBase(typeInfoHelper, type, pool, maxLen, params)
{}
@@ -588,7 +589,7 @@ public:
: TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params)
{}
- TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
+ TResourceArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
: TBase(typeInfoHelper, type, pool, maxLen, params)
{}
@@ -629,7 +630,7 @@ public:
Reserve();
}
- TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
+ TStringArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
: TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
{
Reserve();
@@ -1104,8 +1105,8 @@ class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleA
using TParams = TArrayBuilderBase::TParams;
public:
- TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
- TVector<TArrayBuilderBase::Ptr>&& children, const TParams& params = {})
+ TTupleArrayBuilder(TVector<TArrayBuilderBase::Ptr>&& children, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
+ size_t maxLen, const TParams& params = {})
: TBase(typeInfoHelper, type, pool, maxLen, params)
, Children_(std::move(children))
{
@@ -1184,7 +1185,7 @@ class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDat
static constexpr auto DataSlot = TDataType<TDate>::Slot;
public:
- TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
+ TTzDateArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {})
: TBase(typeInfoHelper, type, pool, maxLen, params)
, DateBuilder_(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params)
, TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen, params)
@@ -1242,8 +1243,8 @@ private:
class TExternalOptionalArrayBuilder final : public TArrayBuilderBase {
public:
- TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
- std::unique_ptr<TArrayBuilderBase>&& inner, const TParams& params = {})
+ TExternalOptionalArrayBuilder(std::unique_ptr<TArrayBuilderBase>&& inner, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
+ size_t maxLen, const TParams& params = {})
: TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params)
, Inner(std::move(inner))
{
@@ -1359,198 +1360,79 @@ private:
using TArrayBuilderParams = TArrayBuilderBase::TParams;
-std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params);
-
-template<bool Nullable>
-inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxLen, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params)
-{
- if constexpr (Nullable) {
- TOptionalTypeInspector typeOpt(typeInfoHelper, type);
- type = typeOpt.GetItemType();
- }
-
- TStructTypeInspector typeStruct(typeInfoHelper, type);
- if (typeStruct) {
- TVector<std::unique_ptr<TArrayBuilderBase>> members;
- for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
- const TType* memberType = typeStruct.GetMemberType(i);
- auto memberBuilder = MakeArrayBuilderBase(typeInfoHelper, memberType, pool, maxLen, pgBuilder, params);
- members.push_back(std::move(memberBuilder));
- }
- // XXX: Use Tuple array builder for Struct.
- return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(members), params);
- }
-
- TTupleTypeInspector typeTuple(typeInfoHelper, type);
- if (typeTuple) {
- TVector<std::unique_ptr<TArrayBuilderBase>> children;
- for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
- const TType* childType = typeTuple.GetElementType(i);
- auto childBuilder = MakeArrayBuilderBase(typeInfoHelper, childType, pool, maxLen, pgBuilder, params);
- children.push_back(std::move(childBuilder));
- }
-
- return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(children), params);
- }
-
- TDataTypeInspector typeData(typeInfoHelper, type);
- if (typeData) {
- auto typeId = typeData.GetTypeId();
- switch (GetDataSlot(typeId)) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeArrayBuilder<i8, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- return std::make_unique<TFixedSizeArrayBuilder<ui8, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeArrayBuilder<i16, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeArrayBuilder<ui16, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Int32:
- case NUdf::EDataSlot::Date32:
- return std::make_unique<TFixedSizeArrayBuilder<i32, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeArrayBuilder<ui32, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- case NUdf::EDataSlot::Interval64:
- case NUdf::EDataSlot::Datetime64:
- case NUdf::EDataSlot::Timestamp64:
- return std::make_unique<TFixedSizeArrayBuilder<i64, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeArrayBuilder<ui64, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Float:
- return std::make_unique<TFixedSizeArrayBuilder<float, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Double:
- return std::make_unique<TFixedSizeArrayBuilder<double, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::String:
- case NUdf::EDataSlot::Yson:
- case NUdf::EDataSlot::JsonDocument:
- return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Utf8:
- case NUdf::EDataSlot::Json:
- return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzDate:
- return std::make_unique<TTzDateArrayBuilder<TTzDate, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzDatetime:
- return std::make_unique<TTzDateArrayBuilder<TTzDatetime, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzTimestamp:
- return std::make_unique<TTzDateArrayBuilder<TTzTimestamp, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzDate32:
- return std::make_unique<TTzDateArrayBuilder<TTzDate32, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzDatetime64:
- return std::make_unique<TTzDateArrayBuilder<TTzDatetime64, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::TzTimestamp64:
- return std::make_unique<TTzDateArrayBuilder<TTzTimestamp64, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- case NUdf::EDataSlot::Decimal:
- return std::make_unique<TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- default:
- Y_ENSURE(false, "Unsupported data slot");
- }
- }
-
- TResourceTypeInspector resource(typeInfoHelper, type);
- if (resource) {
- return std::make_unique<TResourceArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, params);
- }
-
- TPgTypeInspector typePg(typeInfoHelper, type);
- if (typePg) {
- auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
- if (desc->PassByValue) {
- return std::make_unique<TFixedSizeArrayBuilder<ui64, true>>(typeInfoHelper, type, pool, maxLen, params);
+struct TBuilderTraits {
+ using TResult = TArrayBuilderBase;
+ template <bool Nullable>
+ using TTuple = TTupleArrayBuilder<Nullable>;
+ template <typename T, bool Nullable>
+ using TFixedSize = TFixedSizeArrayBuilder<T, Nullable>;
+ template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal>
+ using TStrings = TStringArrayBuilder<TStringType, Nullable>;
+ using TExtOptional = TExternalOptionalArrayBuilder;
+ template<bool Nullable>
+ using TResource = TResourceArrayBuilder<Nullable>;
+ template<typename TTzDate, bool Nullable>
+ using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>;
+
+ constexpr static bool PassType = true;
+
+ static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
+ if (desc.PassByValue) {
+ return std::make_unique<TFixedSize<ui64, true>>(type, typeInfoHelper, pool, maxLen, params);
} else {
- if (desc->Typelen == -1) {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(typeInfoHelper, type, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc->Typelen);
+ if (desc.Typelen == -1) {
+ auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(type, typeInfoHelper, pool, maxLen, params);
+ ret->SetPgBuilder(pgBuilder, desc.Typelen);
return ret;
- } else if (desc->Typelen == -2) {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(typeInfoHelper, type, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc->Typelen);
+ } else if (desc.Typelen == -2) {
+ auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(type, typeInfoHelper, pool, maxLen, params);
+ ret->SetPgBuilder(pgBuilder, desc.Typelen);
return ret;
} else {
- auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(typeInfoHelper, type, pool, maxLen, params);
- ret->SetPgBuilder(pgBuilder, desc->Typelen);
+ auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(type, typeInfoHelper, pool, maxLen, params);
+ ret->SetPgBuilder(pgBuilder, desc.Typelen);
return ret;
}
}
}
- Y_ENSURE(false, "Unsupported type");
-}
-
-inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase(
- const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) {
- const TType* unpacked = type;
- TOptionalTypeInspector typeOpt(typeInfoHelper, type);
- if (typeOpt) {
- unpacked = typeOpt.GetItemType();
- }
-
- TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
- TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
- if (unpackedOpt || typeOpt && unpackedPg) {
- // at least 2 levels of optionals
- ui32 nestLevel = 0;
- auto currentType = type;
- auto previousType = type;
- TVector<const TType*> types;
- for (;;) {
- ++nestLevel;
- previousType = currentType;
- types.push_back(currentType);
- TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
- currentType = currentOpt.GetItemType();
- TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
- if (!nexOpt) {
- break;
- }
- }
-
- if (TPgTypeInspector(typeInfoHelper, currentType)) {
- previousType = currentType;
- ++nestLevel;
- }
-
- auto builder = MakeArrayBuilderBase(typeInfoHelper, previousType, pool, maxBlockLength, pgBuilder, params);
- for (ui32 i = 1; i < nestLevel; ++i) {
- builder = std::make_unique<TExternalOptionalArrayBuilder>(typeInfoHelper, types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder), params);
+ static std::unique_ptr<TResult> MakeResource(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
+ if (isOptional) {
+ return std::make_unique<TResource<true>>(type, typeInfoHelper, pool, maxLen, params);
+ } else {
+ return std::make_unique<TResource<false>>(type, typeInfoHelper, pool, maxLen, params);
}
+ }
- return builder;
- } else {
- if (typeOpt) {
- return MakeArrayBuilderImpl<true>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params);
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
+ if (isOptional) {
+ return std::make_unique<TTzDateReader<TTzDate, true>>(type, typeInfoHelper, pool, maxLen, params);
} else {
- return MakeArrayBuilderImpl<false>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params);
+ return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params);
}
}
-}
+};
inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder) {
- return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, {});
+ size_t maxBlockLength, const IPgBuilder* pgBuilder)
+{
+ return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {});
}
inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated) {
- return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, {.TotalAllocated = totalAllocated});
+ size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated)
+{
+ return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {.TotalAllocated = totalAllocated});
}
inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool,
- size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) {
- return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params);
+ size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params)
+{
+ return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, params);
}
inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
diff --git a/yql/essentials/public/udf/arrow/block_item_comparator.h b/yql/essentials/public/udf/arrow/block_item_comparator.h
index a13b9ec7cc..e185b63f66 100644
--- a/yql/essentials/public/udf/arrow/block_item_comparator.h
+++ b/yql/essentials/public/udf/arrow/block_item_comparator.h
@@ -1,8 +1,6 @@
#pragma once
#include "block_item.h"
-#include "block_reader.h"
-
#include <yql/essentials/public/udf/udf_ptr.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
@@ -179,7 +177,7 @@ public:
bool DoCompare(TBlockItem lhs, TBlockItem rhs) const {
const auto x = lhs.Get<TLayout>();
const auto y = rhs.Get<TLayout>();
-
+
if (x == y) {
const auto tx = lhs.GetTimezoneId();
const auto ty = rhs.GetTimezoneId();
@@ -196,8 +194,7 @@ public:
bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
return lhs.Get<TLayout>() == rhs.Get<TLayout>() && lhs.GetTimezoneId() == rhs.GetTimezoneId();
}
-
-
+
bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
return std::forward_as_tuple(lhs.Get<TLayout>(), lhs.GetTimezoneId()) < std::forward_as_tuple(rhs.Get<TLayout>(), rhs.GetTimezoneId());
}
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h
index 77a74b155a..05dd3ce440 100644
--- a/yql/essentials/public/udf/arrow/block_reader.h
+++ b/yql/essentials/public/udf/arrow/block_reader.h
@@ -2,11 +2,12 @@
#include "block_item.h"
#include "block_io_buffer.h"
+#include "dispatch_traits.h"
#include "util.h"
+
#include <arrow/datum.h>
-#include <yql/essentials/public/udf/udf_type_inspection.h>
-#include <yql/essentials/public/udf/udf_value_builder.h>
+#include <yql/essentials/public/decimal/yql_decimal.h>
namespace NYql {
namespace NUdf {
@@ -498,6 +499,8 @@ struct TReaderTraits {
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
+ constexpr static bool PassType = false;
+
static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
@@ -525,186 +528,8 @@ struct TReaderTraits {
}
};
-template <typename TTraits, typename... TArgs>
-std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, TArgs... args) {
- if (isOptional) {
- return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), args...);
- } else {
- return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), args...);
- }
-}
-
-template <typename TTraits, typename T, typename... TArgs>
-std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional, TArgs... args) {
- if (isOptional) {
- return std::make_unique<typename TTraits::template TFixedSize<T, true>>(args...);
- } else {
- return std::make_unique<typename TTraits::template TFixedSize<T, false>>(args...);
- }
-}
-
-template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs>
-std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional, TArgs... args) {
- if (isOptional) {
- return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(args...);
- } else {
- return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(args...);
- }
-}
-
-template<typename TTraits>
-concept CanInstantiateBlockReaderForDecimal = requires {
- typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>;
-};
-
-template <typename TTraits, typename... TArgs>
-std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs... args) {
- const TType* unpacked = type;
- TOptionalTypeInspector typeOpt(typeInfoHelper, type);
- bool isOptional = false;
- if (typeOpt) {
- unpacked = typeOpt.GetItemType();
- isOptional = true;
- }
-
- TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
- TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
- if (unpackedOpt || typeOpt && unpackedPg) {
- // at least 2 levels of optionals
- ui32 nestLevel = 0;
- auto currentType = type;
- auto previousType = type;
- for (;;) {
- ++nestLevel;
- previousType = currentType;
- TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
- currentType = currentOpt.GetItemType();
- TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
- if (!nexOpt) {
- break;
- }
- }
-
- if (TPgTypeInspector(typeInfoHelper, currentType)) {
- previousType = currentType;
- ++nestLevel;
- }
-
- auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder, args...);
- for (ui32 i = 1; i < nestLevel; ++i) {
- reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), args...);
- }
-
- return reader;
- }
- else {
- type = unpacked;
- }
-
- TStructTypeInspector typeStruct(typeInfoHelper, type);
- if (typeStruct) {
- TVector<std::unique_ptr<typename TTraits::TResult>> members;
- for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
- members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, args...));
- }
- // XXX: Use Tuple block reader for Struct.
- return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members), args...);
- }
-
- TTupleTypeInspector typeTuple(typeInfoHelper, type);
- if (typeTuple) {
- TVector<std::unique_ptr<typename TTraits::TResult>> children;
- for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, args...));
- }
-
- return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children), args...);
- }
-
- TDataTypeInspector typeData(typeInfoHelper, type);
- if (typeData) {
- auto typeId = typeData.GetTypeId();
- switch (GetDataSlot(typeId)) {
- case NUdf::EDataSlot::Int8:
- return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional, args...);
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional, args...);
- case NUdf::EDataSlot::Int16:
- return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional, args...);
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional, args...);
- case NUdf::EDataSlot::Int32:
- case NUdf::EDataSlot::Date32:
- return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional, args...);
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional, args...);
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- case NUdf::EDataSlot::Interval64:
- case NUdf::EDataSlot::Datetime64:
- case NUdf::EDataSlot::Timestamp64:
- return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional, args...);
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional, args...);
- case NUdf::EDataSlot::Float:
- return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional, args...);
- case NUdf::EDataSlot::Double:
- return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional, args...);
- case NUdf::EDataSlot::String:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, args...);
- case NUdf::EDataSlot::Yson:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, args...);
- case NUdf::EDataSlot::JsonDocument:
- return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, args...);
- case NUdf::EDataSlot::Utf8:
- return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, args...);
- case NUdf::EDataSlot::Json:
- return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, args...);
- case NUdf::EDataSlot::TzDate:
- return TTraits::template MakeTzDate<TTzDate>(isOptional, args...);
- case NUdf::EDataSlot::TzDatetime:
- return TTraits::template MakeTzDate<TTzDatetime>(isOptional, args...);
- case NUdf::EDataSlot::TzTimestamp:
- return TTraits::template MakeTzDate<TTzTimestamp>(isOptional, args...);
- case NUdf::EDataSlot::TzDate32:
- return TTraits::template MakeTzDate<TTzDate32>(isOptional, args...);
- case NUdf::EDataSlot::TzDatetime64:
- return TTraits::template MakeTzDate<TTzDatetime64>(isOptional, args...);
- case NUdf::EDataSlot::TzTimestamp64:
- return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional, args...);
- case NUdf::EDataSlot::Decimal: {
- if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) {
- return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, args...);
- } else {
- Y_ENSURE(false, "Unsupported data slot");
- }
- }
- case NUdf::EDataSlot::Uuid:
- case NUdf::EDataSlot::DyNumber:
- Y_ENSURE(false, "Unsupported data slot");
- }
- }
-
- TResourceTypeInspector resource(typeInfoHelper, type);
- if (resource) {
- return TTraits::MakeResource(isOptional, args...);
- }
-
- TPgTypeInspector typePg(typeInfoHelper, type);
- if (typePg) {
- auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
- return TTraits::MakePg(*desc, pgBuilder, args...);
- }
-
- Y_ENSURE(false, "Unsupported type");
-}
-
inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
- return MakeBlockReaderImpl<TReaderTraits>(typeInfoHelper, type, nullptr);
+ return DispatchByArrowTraits<TReaderTraits>(typeInfoHelper, type, nullptr);
}
inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, const TType* type, TBlockItemSerializeProps& props) {
diff --git a/yql/essentials/public/udf/arrow/dispatch_traits.h b/yql/essentials/public/udf/arrow/dispatch_traits.h
new file mode 100644
index 0000000000..88c303cc87
--- /dev/null
+++ b/yql/essentials/public/udf/arrow/dispatch_traits.h
@@ -0,0 +1,237 @@
+#pragma once
+
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <yql/essentials/public/udf/udf_value_builder.h>
+
+#include <arrow/type.h>
+
+namespace NYql {
+namespace NUdf {
+
+template <typename TTraits, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeTupleArrowTraitsImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, const TType* type, TArgs&&... args) {
+ if (isOptional) {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), std::forward<TArgs>(args)...);
+ }
+ } else {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), std::forward<TArgs>(args)...);
+ }
+ }
+}
+
+template <typename TTraits, typename T, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeFixedSizeArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) {
+ if (isOptional) {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TFixedSize<T, true>>(type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TFixedSize<T, true>>(std::forward<TArgs>(args)...);
+ }
+ } else {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TFixedSize<T, false>>(type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TFixedSize<T, false>>(std::forward<TArgs>(args)...);
+ }
+ }
+}
+
+template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeStringArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) {
+ if (isOptional) {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(std::forward<TArgs>(args)...);
+ }
+ } else {
+ if constexpr (TTraits::PassType) {
+ return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(type, std::forward<TArgs>(args)...);
+ } else {
+ return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(std::forward<TArgs>(args)...);
+ }
+ }
+}
+
+template <typename TTraits, typename TTzDate, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> MakeTzDateArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) {
+ if constexpr (TTraits::PassType) {
+ return TTraits::template MakeTzDate<TTzDate>(isOptional, type, std::forward<TArgs>(args)...);
+ } else {
+ return TTraits::template MakeTzDate<TTzDate>(isOptional, std::forward<TArgs>(args)...);
+ }
+}
+
+template<typename TTraits>
+concept CanInstantiateArrowTraitsForDecimal = requires {
+ typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>;
+};
+
+template <typename TTraits, typename... TArgs>
+std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs&&... args) {
+ const TType* unpacked = type;
+ TOptionalTypeInspector typeOpt(typeInfoHelper, type);
+ bool isOptional = false;
+ if (typeOpt) {
+ unpacked = typeOpt.GetItemType();
+ isOptional = true;
+ }
+
+ TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
+ TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
+ if (unpackedOpt || typeOpt && unpackedPg) {
+ // at least 2 levels of optionals
+ ui32 nestLevel = 0;
+ auto currentType = type;
+ auto previousType = type;
+ TVector<const TType*> types;
+ for (;;) {
+ ++nestLevel;
+ previousType = currentType;
+ types.push_back(currentType);
+ TOptionalTypeInspector currentOpt(typeInfoHelper, currentType);
+ currentType = currentOpt.GetItemType();
+ TOptionalTypeInspector nexOpt(typeInfoHelper, currentType);
+ if (!nexOpt) {
+ break;
+ }
+ }
+
+ if (TPgTypeInspector(typeInfoHelper, currentType)) {
+ previousType = currentType;
+ ++nestLevel;
+ }
+
+ auto reader = DispatchByArrowTraits<TTraits>(typeInfoHelper, previousType, pgBuilder, std::forward<TArgs>(args)...);
+ for (ui32 i = 1; i < nestLevel; ++i) {
+ if constexpr (TTraits::PassType) {
+ reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), types[nestLevel - 1 - i], std::forward<TArgs>(args)...);
+ } else {
+ reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), std::forward<TArgs>(args)...);
+ }
+ }
+
+ return reader;
+ }
+ else {
+ type = unpacked;
+ }
+
+ TStructTypeInspector typeStruct(typeInfoHelper, type);
+ if (typeStruct) {
+ TVector<std::unique_ptr<typename TTraits::TResult>> members;
+ for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) {
+ members.emplace_back(DispatchByArrowTraits<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, std::forward<TArgs>(args)...));
+ }
+ // XXX: Use Tuple block reader for Struct.
+ return MakeTupleArrowTraitsImpl<TTraits>(isOptional, std::move(members), type, std::forward<TArgs>(args)...);
+ }
+
+ TTupleTypeInspector typeTuple(typeInfoHelper, type);
+ if (typeTuple) {
+ TVector<std::unique_ptr<typename TTraits::TResult>> children;
+ for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) {
+ children.emplace_back(DispatchByArrowTraits<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, std::forward<TArgs>(args)...));
+ }
+
+ return MakeTupleArrowTraitsImpl<TTraits>(isOptional, std::move(children), type, std::forward<TArgs>(args)...);
+ }
+
+ TDataTypeInspector typeData(typeInfoHelper, type);
+ if (typeData) {
+ auto typeId = typeData.GetTypeId();
+ switch (GetDataSlot(typeId)) {
+ case NUdf::EDataSlot::Int8:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, i8>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, ui8>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Int16:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, i16>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, ui16>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Int32:
+ case NUdf::EDataSlot::Date32:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, i32>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, ui32>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ case NUdf::EDataSlot::Interval64:
+ case NUdf::EDataSlot::Datetime64:
+ case NUdf::EDataSlot::Timestamp64:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, i64>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, ui64>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Float:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, float>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Double:
+ return MakeFixedSizeArrowTraitsImpl<TTraits, double>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::String:
+ return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Yson:
+ return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::JsonDocument:
+ return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Utf8:
+ return MakeStringArrowTraitsImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Json:
+ return MakeStringArrowTraitsImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzDate:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzDate>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzDatetime:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzDatetime>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzTimestamp:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzTimestamp>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzDate32:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzDate32>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzDatetime64:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzDatetime64>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::TzTimestamp64:
+ return MakeTzDateArrowTraitsImpl<TTraits, TTzTimestamp64>(isOptional, type, std::forward<TArgs>(args)...);
+ case NUdf::EDataSlot::Decimal: {
+ if constexpr (CanInstantiateArrowTraitsForDecimal<TTraits>) {
+ return MakeFixedSizeArrowTraitsImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, type, std::forward<TArgs>(args)...);
+ } else {
+ Y_ENSURE(false, "Unsupported data slot");
+ }
+ }
+ case NUdf::EDataSlot::Uuid:
+ case NUdf::EDataSlot::DyNumber:
+ Y_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ TResourceTypeInspector resource(typeInfoHelper, type);
+ if (resource) {
+ if constexpr (TTraits::PassType) {
+ return TTraits::MakeResource(isOptional, type, std::forward<TArgs>(args)...);
+ } else {
+ return TTraits::MakeResource(isOptional, std::forward<TArgs>(args)...);
+ }
+ }
+
+ TPgTypeInspector typePg(typeInfoHelper, type);
+ if (typePg) {
+ auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId());
+ if constexpr (TTraits::PassType) {
+ return TTraits::MakePg(*desc, pgBuilder, type, std::forward<TArgs>(args)...);
+ } else {
+ return TTraits::MakePg(*desc, pgBuilder, std::forward<TArgs>(args)...);
+ }
+ }
+
+ Y_ENSURE(false, "Unsupported type");
+}
+
+}
+}
diff --git a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
index 87fc63ece2..bbb4c134c8 100644
--- a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
+++ b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
@@ -1,6 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <yql/essentials/public/udf/arrow/block_builder.h>
+#include <yql/essentials/public/udf/arrow/block_reader.h>
#include <yql/essentials/public/udf/arrow/memory_pool.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
@@ -32,7 +33,7 @@ struct TArrayBuilderTestData {
};
std::unique_ptr<IArrayBuilder> MakeResourceArrayBuilder(TType* resourceType, TArrayBuilderTestData& data) {
- auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType,
+ auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType,
*data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */nullptr);
UNIT_ASSERT_C(arrayBuilder, "Failed to make resource arrow array builder");
return arrayBuilder;
@@ -51,7 +52,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
auto value = datum.array()->GetValues<TUnboxedValue>(1)[0];
UNIT_ASSERT(value.IsEmbedded());
- UNIT_ASSERT_VALUES_EQUAL_C(TStringRef(value.AsStringRef()), TStringRef(resource.AsStringRef()),
+ UNIT_ASSERT_VALUES_EQUAL_C(TStringRef(value.AsStringRef()), TStringRef(resource.AsStringRef()),
"Expected equal values after building array");
}
@@ -66,7 +67,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
struct TWithDtor {
int Payload;
std::shared_ptr<int> DestructorCallsCnt;
- TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt):
+ TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt):
Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) {
}
~TWithDtor() {
@@ -86,7 +87,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
UNIT_ASSERT_VALUES_EQUAL(datum.length(), 1);
const auto value = datum.array()->GetValues<TUnboxedValuePod>(1)[0];
- auto boxed = value.AsBoxed().Get();
+ auto boxed = value.AsBoxed().Get();
const auto resource = reinterpret_cast<TTestResource*>(boxed);
UNIT_ASSERT_VALUES_EQUAL(resource->Get()->get()->Payload, payload);
}
@@ -110,7 +111,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
} else {
arrayBuilder->Add(TUnboxedValuePod{});
}
- }
+ }
auto datum = arrayBuilder->Build(true);
const auto blockReader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType);
for (int i = 0; i < 4; i++) {
@@ -125,7 +126,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
}
}
}
-
+
Y_UNIT_TEST(TestBuilderWithReader) {
TArrayBuilderTestData data;
const auto resourceType = data.PgmBuilder.NewResourceType("Test.Resource");
@@ -147,7 +148,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
UNIT_ASSERT_C(std::memcmp(item1.GetRawPtr(), item1AfterRead.GetRawPtr(), sizeof(TBlockItem)) == 0, "Expected UnboxedValue to equal to BlockItem");
UNIT_ASSERT_C(std::memcmp(item2.GetRawPtr(), item2AfterRead.GetRawPtr(), sizeof(TBlockItem)) == 0, "Expected UnboxedValue to equal to BlockItem");
}
-
+
Y_UNIT_TEST(TestBoxedResourceReader) {
TArrayBuilderTestData data;
const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName);
@@ -179,7 +180,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
Y_UNIT_TEST(TestTzDateBuilder_Layout) {
TArrayBuilderTestData data;
const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate);
- const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType,
+ const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType,
*data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr);
auto makeTzDate = [] (ui16 val, ui16 tz) {
@@ -192,7 +193,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
for (auto date: dates) {
arrayBuilder->Add(date);
}
-
+
const auto datum = arrayBuilder->Build(true);
UNIT_ASSERT(datum.is_array());
UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size());
@@ -250,7 +251,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
const TBlockItem hugeItem = sItem2.MakeOptional();
- const size_t stringAllocStep =
+ const size_t stringAllocStep =
arrow::BitUtil::RoundUpToMultipleOf64(blockLen + 1) + // String NullMask
arrow::BitUtil::RoundUpToMultipleOf64((blockLen + 1) * 4) + // String Offsets
NMiniKQL::MaxBlockSizeInBytes; // String Data
@@ -283,4 +284,4 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
builder->Build(false);
UNIT_ASSERT_VALUES_EQUAL(totalAllocated, initialAllocated);
}
-} \ No newline at end of file
+}
diff --git a/yql/essentials/public/udf/arrow/ya.make b/yql/essentials/public/udf/arrow/ya.make
index 4bdcdbe3c2..3928b63852 100644
--- a/yql/essentials/public/udf/arrow/ya.make
+++ b/yql/essentials/public/udf/arrow/ya.make
@@ -5,6 +5,7 @@ SRCS(
udf_arrow_helpers.cpp
bit_util.cpp
util.cpp
+ block_builder.cpp
block_reader.cpp
block_item.cpp
block_item_hasher.cpp
diff --git a/yt/yql/providers/yt/codec/yt_arrow_converter.cpp b/yt/yql/providers/yt/codec/yt_arrow_converter.cpp
index b36912cb67..7d85493324 100644
--- a/yt/yql/providers/yt/codec/yt_arrow_converter.cpp
+++ b/yt/yql/providers/yt/codec/yt_arrow_converter.cpp
@@ -3,7 +3,6 @@
#include <yql/essentials/public/udf/arrow/defs.h>
#include <yql/essentials/public/udf/arrow/block_builder.h>
-#include <yql/essentials/public/udf/arrow/block_reader.h>
#include <yql/essentials/utils/yql_panic.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
@@ -461,7 +460,10 @@ struct TComplexTypeYsonReaderTraits {
using TStrings = TStringYsonReader<TStringType, Nullable, OriginalT, Native>;
using TExtOptional = TExternalOptYsonReader<Native>;
- static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* /* pgBuilder */) {
+ constexpr static bool PassType = false;
+
+ static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
+ Y_UNUSED(pgBuilder);
return BuildPgYsonColumnReader(desc);
}
@@ -495,7 +497,7 @@ template<bool Native, bool IsTopOptional>
class TComplexTypeYsonColumnConverter final : public IYtColumnConverter {
public:
TComplexTypeYsonColumnConverter(TYtColumnConverterSettings&& settings) : Settings_(std::move(settings)) {
- Reader_ = NUdf::MakeBlockReaderImpl<TComplexTypeYsonReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder);
+ Reader_ = NUdf::DispatchByArrowTraits<TComplexTypeYsonReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder);
}
arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) {
diff --git a/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp b/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp
index c299f8d6ae..4f3c3b46cc 100644
--- a/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp
+++ b/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp
@@ -2,8 +2,11 @@
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/public/udf/arrow/defs.h>
#include <yql/essentials/utils/yql_panic.h>
+#include <arrow/compute/cast.h>
+
namespace NYql {
class TBasicOutputConverter : public IYtOutputColumnConverter {