diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-01-20 12:28:13 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-01-20 12:47:42 +0300 |
commit | f7ffd0055251048ccfe299d5e7518f95b4b79bd6 (patch) | |
tree | a703380f3dfc98cfa9f170bd02d2124f436f32c0 | |
parent | c345883bbb07a2f0e3c4c4fbb75b9e468627e834 (diff) | |
download | ydb-f7ffd0055251048ccfe299d5e7518f95b4b79bd6.tar.gz |
Use type dispatching traits in Arrow array builders
commit_hash:ffdd14a35d07660b4bebf2a6471da83349a37d78
15 files changed, 348 insertions, 387 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp index d32fec8e89..b9836c0eca 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -97,7 +97,7 @@ constexpr TIn InitialStateValue() { } } else if constexpr (std::is_same_v<TIn, NYql::NDecimal::TInt128>) { if constexpr (IsMin) { - return NYql::NDecimal::Nan(); + return NYql::NDecimal::Nan(); } else { return -NYql::NDecimal::Inf(); } @@ -131,7 +131,7 @@ class TColumnBuilder : public IAggColumnBuilder { using TStateType = TState<IsNullable, TIn, IsMin>; public: TColumnBuilder(ui64 size, TType* type, TComputationContext& ctx) - : Builder_(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, size) + : Builder_(type, TTypeInfoHelper(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp index 5b550ecd2f..c3efcd3c84 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -45,7 +45,7 @@ public: using TStateType = TSumState<IsNullable, TSum>; TSumColumnBuilder(ui64 size, TType* dataType, TComputationContext& ctx) - : Builder_(TTypeInfoHelper(), dataType, ctx.ArrowMemoryPool, size) + : Builder_(dataType, TTypeInfoHelper(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { } diff --git a/yql/essentials/minikql/computation/mkql_block_reader.cpp b/yql/essentials/minikql/computation/mkql_block_reader.cpp index d718096f70..4e2060e739 100644 --- a/yql/essentials/minikql/computation/mkql_block_reader.cpp +++ b/yql/essentials/minikql/computation/mkql_block_reader.cpp @@ -286,6 +286,8 @@ struct TConverterTraits { template<typename TTzDate, bool Nullable> using TTzDateConverter = TTzDateBlockItemConverter<TTzDate, Nullable>; + constexpr static bool PassType = false; + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { if (desc.PassByValue) { return std::make_unique<TFixedSize<ui64, true>>(); @@ -328,7 +330,7 @@ struct TConverterTraits { } // namespace std::unique_ptr<IBlockItemConverter> MakeBlockItemConverter(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const NUdf::IPgBuilder& pgBuilder) { - return NYql::NUdf::MakeBlockReaderImpl<TConverterTraits>(typeInfoHelper, type, &pgBuilder); + return NYql::NUdf::DispatchByArrowTraits<TConverterTraits>(typeInfoHelper, type, &pgBuilder); } } // namespace NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp index d68029c315..a03a5027e8 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.cpp +++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp @@ -2,7 +2,7 @@ #include "mkql_block_builder.h" #include <yql/essentials/minikql/mkql_type_builder.h> -#include <yql/essentials/public/udf/arrow/block_reader.h> +#include <yql/essentials/public/udf/arrow/dispatch_traits.h> #include <yql/essentials/public/udf/arrow/memory_pool.h> #include <yql/essentials/utils/yql_panic.h> @@ -633,6 +633,7 @@ struct TSerializerTraits { template<typename TTzDateType, bool Nullable> using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>; + constexpr static bool PassType = false; static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -670,6 +671,8 @@ struct TDeserializerTraits { template<typename TTzDateType, bool Nullable> using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>; + constexpr static bool PassType = false; + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { @@ -698,11 +701,11 @@ struct TDeserializerTraits { std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { - return NYql::NUdf::MakeBlockReaderImpl<TSerializerTraits>(typeInfoHelper, type, nullptr); + return NYql::NUdf::DispatchByArrowTraits<TSerializerTraits>(typeInfoHelper, type, nullptr); } std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { - std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::MakeBlockReaderImpl<TDeserializerTraits>(typeInfoHelper, type, nullptr); + std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr); result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type)); return std::move(result); } diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp index 02baba2c23..b53a3890a4 100644 --- a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp @@ -2,8 +2,8 @@ #include <yql/essentials/minikql/arrow/arrow_util.h> #include <yql/essentials/public/decimal/yql_decimal.h> -#include <yql/essentials/public/udf/arrow/block_reader.h> #include <yql/essentials/public/udf/arrow/defs.h> +#include <yql/essentials/public/udf/arrow/dispatch_traits.h> #include <yql/essentials/public/udf/arrow/util.h> #include <yql/essentials/public/udf/udf_type_inspection.h> #include <yql/essentials/public/udf/udf_value.h> @@ -218,6 +218,8 @@ struct TTrimmerTraits { template<typename TTzDate, bool Nullable> using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>; + constexpr static bool PassType = false; + static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { @@ -246,7 +248,7 @@ struct TTrimmerTraits { }; IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) { - return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool); + return DispatchByArrowTraits<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool); } } diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp index 4ebb74f3ec..6c2ecb6b16 100644 --- a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp @@ -3,6 +3,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <yql/essentials/public/udf/arrow/block_builder.h> +#include <yql/essentials/public/udf/arrow/block_reader.h> #include <yql/essentials/public/udf/arrow/memory_pool.h> #include <yql/essentials/minikql/mkql_type_builder.h> #include <yql/essentials/minikql/mkql_function_registry.h> diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp index 1fc58bf87e..d1df31a97d 100644 --- a/yql/essentials/minikql/mkql_type_builder.cpp +++ b/yql/essentials/minikql/mkql_type_builder.cpp @@ -6,6 +6,7 @@ #include <yql/essentials/public/udf/udf_type_ops.h> #include <yql/essentials/public/udf/arrow/block_item_comparator.h> #include <yql/essentials/public/udf/arrow/block_item_hasher.h> +#include <yql/essentials/public/udf/arrow/dispatch_traits.h> #include <library/cpp/containers/stack_vector/stack_vec.h> #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> @@ -2552,6 +2553,8 @@ struct TComparatorTraits { template <typename T, bool Nullable> using TTzDateComparator = NUdf::TTzDateBlockItemComparator<T, Nullable>; + constexpr static bool PassType = false; + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); return std::unique_ptr<TResult>(MakePgItemComparator(desc.TypeId).Release()); @@ -2584,6 +2587,8 @@ struct THasherTraits { template <typename T, bool Nullable> using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher<T, Nullable>; + constexpr static bool PassType = false; + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); return std::unique_ptr<TResult>(MakePgItemHasher(desc.TypeId).Release()); @@ -2605,11 +2610,11 @@ struct THasherTraits { }; NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const { - return NUdf::MakeBlockReaderImpl<TComparatorTraits>(TTypeInfoHelper(), type, nullptr).release(); + return NUdf::DispatchByArrowTraits<TComparatorTraits>(TTypeInfoHelper(), type, nullptr).release(); } NUdf::IBlockItemHasher::TPtr TBlockTypeHelper::MakeHasher(NUdf::TType* type) const { - return NUdf::MakeBlockReaderImpl<THasherTraits>(TTypeInfoHelper(), type, nullptr).release(); + return NUdf::DispatchByArrowTraits<THasherTraits>(TTypeInfoHelper(), type, nullptr).release(); } TType* TTypeBuilder::NewVoidType() const { diff --git a/yql/essentials/public/udf/arrow/block_builder.h b/yql/essentials/public/udf/arrow/block_builder.h index 93d084d2c2..92f4f7e123 100644 --- a/yql/essentials/public/udf/arrow/block_builder.h +++ b/yql/essentials/public/udf/arrow/block_builder.h @@ -4,6 +4,7 @@ #include "bit_util.h" #include "block_io_buffer.h" #include "block_item.h" +#include "dispatch_traits.h" #include <yql/essentials/public/udf/udf_value.h> #include <yql/essentials/public/udf/udf_value_builder.h> @@ -525,7 +526,7 @@ public: : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params) {} - TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) + TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) : TBase(typeInfoHelper, type, pool, maxLen, params) {} @@ -557,7 +558,7 @@ public: : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params) {} - TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) + TFixedSizeArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) : TBase(typeInfoHelper, type, pool, maxLen, params) {} @@ -588,7 +589,7 @@ public: : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, params) {} - TResourceArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) + TResourceArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) : TBase(typeInfoHelper, type, pool, maxLen, params) {} @@ -629,7 +630,7 @@ public: Reserve(); } - TStringArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) + TStringArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params) { Reserve(); @@ -1104,8 +1105,8 @@ class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleA using TParams = TArrayBuilderBase::TParams; public: - TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, - TVector<TArrayBuilderBase::Ptr>&& children, const TParams& params = {}) + TTupleArrayBuilder(TVector<TArrayBuilderBase::Ptr>&& children, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, + size_t maxLen, const TParams& params = {}) : TBase(typeInfoHelper, type, pool, maxLen, params) , Children_(std::move(children)) { @@ -1184,7 +1185,7 @@ class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDat static constexpr auto DataSlot = TDataType<TDate>::Slot; public: - TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) + TTzDateArrayBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TParams& params = {}) : TBase(typeInfoHelper, type, pool, maxLen, params) , DateBuilder_(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen, params) , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen, params) @@ -1242,8 +1243,8 @@ private: class TExternalOptionalArrayBuilder final : public TArrayBuilderBase { public: - TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, - std::unique_ptr<TArrayBuilderBase>&& inner, const TParams& params = {}) + TExternalOptionalArrayBuilder(std::unique_ptr<TArrayBuilderBase>&& inner, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, + size_t maxLen, const TParams& params = {}) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params) , Inner(std::move(inner)) { @@ -1359,198 +1360,79 @@ private: using TArrayBuilderParams = TArrayBuilderBase::TParams; -std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase( - const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params); - -template<bool Nullable> -inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl( - const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxLen, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) -{ - if constexpr (Nullable) { - TOptionalTypeInspector typeOpt(typeInfoHelper, type); - type = typeOpt.GetItemType(); - } - - TStructTypeInspector typeStruct(typeInfoHelper, type); - if (typeStruct) { - TVector<std::unique_ptr<TArrayBuilderBase>> members; - for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) { - const TType* memberType = typeStruct.GetMemberType(i); - auto memberBuilder = MakeArrayBuilderBase(typeInfoHelper, memberType, pool, maxLen, pgBuilder, params); - members.push_back(std::move(memberBuilder)); - } - // XXX: Use Tuple array builder for Struct. - return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(members), params); - } - - TTupleTypeInspector typeTuple(typeInfoHelper, type); - if (typeTuple) { - TVector<std::unique_ptr<TArrayBuilderBase>> children; - for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { - const TType* childType = typeTuple.GetElementType(i); - auto childBuilder = MakeArrayBuilderBase(typeInfoHelper, childType, pool, maxLen, pgBuilder, params); - children.push_back(std::move(childBuilder)); - } - - return std::make_unique<TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, std::move(children), params); - } - - TDataTypeInspector typeData(typeInfoHelper, type); - if (typeData) { - auto typeId = typeData.GetTypeId(); - switch (GetDataSlot(typeId)) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeArrayBuilder<i8, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - return std::make_unique<TFixedSizeArrayBuilder<ui8, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeArrayBuilder<i16, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeArrayBuilder<ui16, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Int32: - case NUdf::EDataSlot::Date32: - return std::make_unique<TFixedSizeArrayBuilder<i32, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeArrayBuilder<ui32, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - case NUdf::EDataSlot::Interval64: - case NUdf::EDataSlot::Datetime64: - case NUdf::EDataSlot::Timestamp64: - return std::make_unique<TFixedSizeArrayBuilder<i64, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeArrayBuilder<ui64, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Float: - return std::make_unique<TFixedSizeArrayBuilder<float, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Double: - return std::make_unique<TFixedSizeArrayBuilder<double, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::String: - case NUdf::EDataSlot::Yson: - case NUdf::EDataSlot::JsonDocument: - return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Utf8: - case NUdf::EDataSlot::Json: - return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzDate: - return std::make_unique<TTzDateArrayBuilder<TTzDate, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzDatetime: - return std::make_unique<TTzDateArrayBuilder<TTzDatetime, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzTimestamp: - return std::make_unique<TTzDateArrayBuilder<TTzTimestamp, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzDate32: - return std::make_unique<TTzDateArrayBuilder<TTzDate32, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzDatetime64: - return std::make_unique<TTzDateArrayBuilder<TTzDatetime64, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::TzTimestamp64: - return std::make_unique<TTzDateArrayBuilder<TTzTimestamp64, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - case NUdf::EDataSlot::Decimal: - return std::make_unique<TFixedSizeArrayBuilder<NYql::NDecimal::TInt128, Nullable>>(typeInfoHelper, type, pool, maxLen, params); - default: - Y_ENSURE(false, "Unsupported data slot"); - } - } - - TResourceTypeInspector resource(typeInfoHelper, type); - if (resource) { - return std::make_unique<TResourceArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, params); - } - - TPgTypeInspector typePg(typeInfoHelper, type); - if (typePg) { - auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); - if (desc->PassByValue) { - return std::make_unique<TFixedSizeArrayBuilder<ui64, true>>(typeInfoHelper, type, pool, maxLen, params); +struct TBuilderTraits { + using TResult = TArrayBuilderBase; + template <bool Nullable> + using TTuple = TTupleArrayBuilder<Nullable>; + template <typename T, bool Nullable> + using TFixedSize = TFixedSizeArrayBuilder<T, Nullable>; + template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal> + using TStrings = TStringArrayBuilder<TStringType, Nullable>; + using TExtOptional = TExternalOptionalArrayBuilder; + template<bool Nullable> + using TResource = TResourceArrayBuilder<Nullable>; + template<typename TTzDate, bool Nullable> + using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>; + + constexpr static bool PassType = true; + + static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) { + if (desc.PassByValue) { + return std::make_unique<TFixedSize<ui64, true>>(type, typeInfoHelper, pool, maxLen, params); } else { - if (desc->Typelen == -1) { - auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(typeInfoHelper, type, pool, maxLen, params); - ret->SetPgBuilder(pgBuilder, desc->Typelen); + if (desc.Typelen == -1) { + auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Text>>(type, typeInfoHelper, pool, maxLen, params); + ret->SetPgBuilder(pgBuilder, desc.Typelen); return ret; - } else if (desc->Typelen == -2) { - auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(typeInfoHelper, type, pool, maxLen, params); - ret->SetPgBuilder(pgBuilder, desc->Typelen); + } else if (desc.Typelen == -2) { + auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::CString>>(type, typeInfoHelper, pool, maxLen, params); + ret->SetPgBuilder(pgBuilder, desc.Typelen); return ret; } else { - auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(typeInfoHelper, type, pool, maxLen, params); - ret->SetPgBuilder(pgBuilder, desc->Typelen); + auto ret = std::make_unique<TStringArrayBuilder<arrow::BinaryType, true, EPgStringType::Fixed>>(type, typeInfoHelper, pool, maxLen, params); + ret->SetPgBuilder(pgBuilder, desc.Typelen); return ret; } } } - Y_ENSURE(false, "Unsupported type"); -} - -inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderBase( - const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) { - const TType* unpacked = type; - TOptionalTypeInspector typeOpt(typeInfoHelper, type); - if (typeOpt) { - unpacked = typeOpt.GetItemType(); - } - - TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); - TPgTypeInspector unpackedPg(typeInfoHelper, unpacked); - if (unpackedOpt || typeOpt && unpackedPg) { - // at least 2 levels of optionals - ui32 nestLevel = 0; - auto currentType = type; - auto previousType = type; - TVector<const TType*> types; - for (;;) { - ++nestLevel; - previousType = currentType; - types.push_back(currentType); - TOptionalTypeInspector currentOpt(typeInfoHelper, currentType); - currentType = currentOpt.GetItemType(); - TOptionalTypeInspector nexOpt(typeInfoHelper, currentType); - if (!nexOpt) { - break; - } - } - - if (TPgTypeInspector(typeInfoHelper, currentType)) { - previousType = currentType; - ++nestLevel; - } - - auto builder = MakeArrayBuilderBase(typeInfoHelper, previousType, pool, maxBlockLength, pgBuilder, params); - for (ui32 i = 1; i < nestLevel; ++i) { - builder = std::make_unique<TExternalOptionalArrayBuilder>(typeInfoHelper, types[nestLevel - 1 - i], pool, maxBlockLength, std::move(builder), params); + static std::unique_ptr<TResult> MakeResource(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) { + if (isOptional) { + return std::make_unique<TResource<true>>(type, typeInfoHelper, pool, maxLen, params); + } else { + return std::make_unique<TResource<false>>(type, typeInfoHelper, pool, maxLen, params); } + } - return builder; - } else { - if (typeOpt) { - return MakeArrayBuilderImpl<true>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params); + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) { + if (isOptional) { + return std::make_unique<TTzDateReader<TTzDate, true>>(type, typeInfoHelper, pool, maxLen, params); } else { - return MakeArrayBuilderImpl<false>(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params); + return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params); } } -} +}; inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder( const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxBlockLength, const IPgBuilder* pgBuilder) { - return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, {}); + size_t maxBlockLength, const IPgBuilder* pgBuilder) +{ + return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {}); } inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder( const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated) { - return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, {.TotalAllocated = totalAllocated}); + size_t maxBlockLength, const IPgBuilder* pgBuilder, size_t* totalAllocated) +{ + return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, TArrayBuilderParams {.TotalAllocated = totalAllocated}); } inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder( const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, - size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) { - return MakeArrayBuilderBase(typeInfoHelper, type, pool, maxBlockLength, pgBuilder, params); + size_t maxBlockLength, const IPgBuilder* pgBuilder, const TArrayBuilderParams& params) +{ + return DispatchByArrowTraits<TBuilderTraits>(typeInfoHelper, type, pgBuilder, typeInfoHelper, pool, maxBlockLength, params); } inline std::unique_ptr<IScalarBuilder> MakeScalarBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type) { diff --git a/yql/essentials/public/udf/arrow/block_item_comparator.h b/yql/essentials/public/udf/arrow/block_item_comparator.h index a13b9ec7cc..e185b63f66 100644 --- a/yql/essentials/public/udf/arrow/block_item_comparator.h +++ b/yql/essentials/public/udf/arrow/block_item_comparator.h @@ -1,8 +1,6 @@ #pragma once #include "block_item.h" -#include "block_reader.h" - #include <yql/essentials/public/udf/udf_ptr.h> #include <yql/essentials/public/udf/udf_type_inspection.h> @@ -179,7 +177,7 @@ public: bool DoCompare(TBlockItem lhs, TBlockItem rhs) const { const auto x = lhs.Get<TLayout>(); const auto y = rhs.Get<TLayout>(); - + if (x == y) { const auto tx = lhs.GetTimezoneId(); const auto ty = rhs.GetTimezoneId(); @@ -196,8 +194,7 @@ public: bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { return lhs.Get<TLayout>() == rhs.Get<TLayout>() && lhs.GetTimezoneId() == rhs.GetTimezoneId(); } - - + bool DoLess(TBlockItem lhs, TBlockItem rhs) const { return std::forward_as_tuple(lhs.Get<TLayout>(), lhs.GetTimezoneId()) < std::forward_as_tuple(rhs.Get<TLayout>(), rhs.GetTimezoneId()); } diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h index 77a74b155a..05dd3ce440 100644 --- a/yql/essentials/public/udf/arrow/block_reader.h +++ b/yql/essentials/public/udf/arrow/block_reader.h @@ -2,11 +2,12 @@ #include "block_item.h" #include "block_io_buffer.h" +#include "dispatch_traits.h" #include "util.h" + #include <arrow/datum.h> -#include <yql/essentials/public/udf/udf_type_inspection.h> -#include <yql/essentials/public/udf/udf_value_builder.h> +#include <yql/essentials/public/decimal/yql_decimal.h> namespace NYql { namespace NUdf { @@ -498,6 +499,8 @@ struct TReaderTraits { template<typename TTzDate, bool Nullable> using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>; + constexpr static bool PassType = false; + static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { @@ -525,186 +528,8 @@ struct TReaderTraits { } }; -template <typename TTraits, typename... TArgs> -std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, TArgs... args) { - if (isOptional) { - return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), args...); - } else { - return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), args...); - } -} - -template <typename TTraits, typename T, typename... TArgs> -std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional, TArgs... args) { - if (isOptional) { - return std::make_unique<typename TTraits::template TFixedSize<T, true>>(args...); - } else { - return std::make_unique<typename TTraits::template TFixedSize<T, false>>(args...); - } -} - -template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs> -std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional, TArgs... args) { - if (isOptional) { - return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(args...); - } else { - return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(args...); - } -} - -template<typename TTraits> -concept CanInstantiateBlockReaderForDecimal = requires { - typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>; -}; - -template <typename TTraits, typename... TArgs> -std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs... args) { - const TType* unpacked = type; - TOptionalTypeInspector typeOpt(typeInfoHelper, type); - bool isOptional = false; - if (typeOpt) { - unpacked = typeOpt.GetItemType(); - isOptional = true; - } - - TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); - TPgTypeInspector unpackedPg(typeInfoHelper, unpacked); - if (unpackedOpt || typeOpt && unpackedPg) { - // at least 2 levels of optionals - ui32 nestLevel = 0; - auto currentType = type; - auto previousType = type; - for (;;) { - ++nestLevel; - previousType = currentType; - TOptionalTypeInspector currentOpt(typeInfoHelper, currentType); - currentType = currentOpt.GetItemType(); - TOptionalTypeInspector nexOpt(typeInfoHelper, currentType); - if (!nexOpt) { - break; - } - } - - if (TPgTypeInspector(typeInfoHelper, currentType)) { - previousType = currentType; - ++nestLevel; - } - - auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder, args...); - for (ui32 i = 1; i < nestLevel; ++i) { - reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), args...); - } - - return reader; - } - else { - type = unpacked; - } - - TStructTypeInspector typeStruct(typeInfoHelper, type); - if (typeStruct) { - TVector<std::unique_ptr<typename TTraits::TResult>> members; - for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) { - members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, args...)); - } - // XXX: Use Tuple block reader for Struct. - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members), args...); - } - - TTupleTypeInspector typeTuple(typeInfoHelper, type); - if (typeTuple) { - TVector<std::unique_ptr<typename TTraits::TResult>> children; - for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, args...)); - } - - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children), args...); - } - - TDataTypeInspector typeData(typeInfoHelper, type); - if (typeData) { - auto typeId = typeData.GetTypeId(); - switch (GetDataSlot(typeId)) { - case NUdf::EDataSlot::Int8: - return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional, args...); - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional, args...); - case NUdf::EDataSlot::Int16: - return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional, args...); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional, args...); - case NUdf::EDataSlot::Int32: - case NUdf::EDataSlot::Date32: - return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional, args...); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional, args...); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - case NUdf::EDataSlot::Interval64: - case NUdf::EDataSlot::Datetime64: - case NUdf::EDataSlot::Timestamp64: - return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional, args...); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional, args...); - case NUdf::EDataSlot::Float: - return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional, args...); - case NUdf::EDataSlot::Double: - return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional, args...); - case NUdf::EDataSlot::String: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, args...); - case NUdf::EDataSlot::Yson: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, args...); - case NUdf::EDataSlot::JsonDocument: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, args...); - case NUdf::EDataSlot::Utf8: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, args...); - case NUdf::EDataSlot::Json: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, args...); - case NUdf::EDataSlot::TzDate: - return TTraits::template MakeTzDate<TTzDate>(isOptional, args...); - case NUdf::EDataSlot::TzDatetime: - return TTraits::template MakeTzDate<TTzDatetime>(isOptional, args...); - case NUdf::EDataSlot::TzTimestamp: - return TTraits::template MakeTzDate<TTzTimestamp>(isOptional, args...); - case NUdf::EDataSlot::TzDate32: - return TTraits::template MakeTzDate<TTzDate32>(isOptional, args...); - case NUdf::EDataSlot::TzDatetime64: - return TTraits::template MakeTzDate<TTzDatetime64>(isOptional, args...); - case NUdf::EDataSlot::TzTimestamp64: - return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional, args...); - case NUdf::EDataSlot::Decimal: { - if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) { - return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, args...); - } else { - Y_ENSURE(false, "Unsupported data slot"); - } - } - case NUdf::EDataSlot::Uuid: - case NUdf::EDataSlot::DyNumber: - Y_ENSURE(false, "Unsupported data slot"); - } - } - - TResourceTypeInspector resource(typeInfoHelper, type); - if (resource) { - return TTraits::MakeResource(isOptional, args...); - } - - TPgTypeInspector typePg(typeInfoHelper, type); - if (typePg) { - auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); - return TTraits::MakePg(*desc, pgBuilder, args...); - } - - Y_ENSURE(false, "Unsupported type"); -} - inline std::unique_ptr<IBlockReader> MakeBlockReader(const ITypeInfoHelper& typeInfoHelper, const TType* type) { - return MakeBlockReaderImpl<TReaderTraits>(typeInfoHelper, type, nullptr); + return DispatchByArrowTraits<TReaderTraits>(typeInfoHelper, type, nullptr); } inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, const TType* type, TBlockItemSerializeProps& props) { diff --git a/yql/essentials/public/udf/arrow/dispatch_traits.h b/yql/essentials/public/udf/arrow/dispatch_traits.h new file mode 100644 index 0000000000..88c303cc87 --- /dev/null +++ b/yql/essentials/public/udf/arrow/dispatch_traits.h @@ -0,0 +1,237 @@ +#pragma once + +#include <yql/essentials/public/udf/udf_type_inspection.h> +#include <yql/essentials/public/udf/udf_value_builder.h> + +#include <arrow/type.h> + +namespace NYql { +namespace NUdf { + +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeTupleArrowTraitsImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, const TType* type, TArgs&&... args) { + if (isOptional) { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), std::forward<TArgs>(args)...); + } + } else { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), std::forward<TArgs>(args)...); + } + } +} + +template <typename TTraits, typename T, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeFixedSizeArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) { + if (isOptional) { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TFixedSize<T, true>>(type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TFixedSize<T, true>>(std::forward<TArgs>(args)...); + } + } else { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TFixedSize<T, false>>(type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TFixedSize<T, false>>(std::forward<TArgs>(args)...); + } + } +} + +template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeStringArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) { + if (isOptional) { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(std::forward<TArgs>(args)...); + } + } else { + if constexpr (TTraits::PassType) { + return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(type, std::forward<TArgs>(args)...); + } else { + return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(std::forward<TArgs>(args)...); + } + } +} + +template <typename TTraits, typename TTzDate, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeTzDateArrowTraitsImpl(bool isOptional, const TType* type, TArgs&&... args) { + if constexpr (TTraits::PassType) { + return TTraits::template MakeTzDate<TTzDate>(isOptional, type, std::forward<TArgs>(args)...); + } else { + return TTraits::template MakeTzDate<TTzDate>(isOptional, std::forward<TArgs>(args)...); + } +} + +template<typename TTraits> +concept CanInstantiateArrowTraitsForDecimal = requires { + typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>; +}; + +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs&&... args) { + const TType* unpacked = type; + TOptionalTypeInspector typeOpt(typeInfoHelper, type); + bool isOptional = false; + if (typeOpt) { + unpacked = typeOpt.GetItemType(); + isOptional = true; + } + + TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); + TPgTypeInspector unpackedPg(typeInfoHelper, unpacked); + if (unpackedOpt || typeOpt && unpackedPg) { + // at least 2 levels of optionals + ui32 nestLevel = 0; + auto currentType = type; + auto previousType = type; + TVector<const TType*> types; + for (;;) { + ++nestLevel; + previousType = currentType; + types.push_back(currentType); + TOptionalTypeInspector currentOpt(typeInfoHelper, currentType); + currentType = currentOpt.GetItemType(); + TOptionalTypeInspector nexOpt(typeInfoHelper, currentType); + if (!nexOpt) { + break; + } + } + + if (TPgTypeInspector(typeInfoHelper, currentType)) { + previousType = currentType; + ++nestLevel; + } + + auto reader = DispatchByArrowTraits<TTraits>(typeInfoHelper, previousType, pgBuilder, std::forward<TArgs>(args)...); + for (ui32 i = 1; i < nestLevel; ++i) { + if constexpr (TTraits::PassType) { + reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), types[nestLevel - 1 - i], std::forward<TArgs>(args)...); + } else { + reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), std::forward<TArgs>(args)...); + } + } + + return reader; + } + else { + type = unpacked; + } + + TStructTypeInspector typeStruct(typeInfoHelper, type); + if (typeStruct) { + TVector<std::unique_ptr<typename TTraits::TResult>> members; + for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) { + members.emplace_back(DispatchByArrowTraits<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, std::forward<TArgs>(args)...)); + } + // XXX: Use Tuple block reader for Struct. + return MakeTupleArrowTraitsImpl<TTraits>(isOptional, std::move(members), type, std::forward<TArgs>(args)...); + } + + TTupleTypeInspector typeTuple(typeInfoHelper, type); + if (typeTuple) { + TVector<std::unique_ptr<typename TTraits::TResult>> children; + for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { + children.emplace_back(DispatchByArrowTraits<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, std::forward<TArgs>(args)...)); + } + + return MakeTupleArrowTraitsImpl<TTraits>(isOptional, std::move(children), type, std::forward<TArgs>(args)...); + } + + TDataTypeInspector typeData(typeInfoHelper, type); + if (typeData) { + auto typeId = typeData.GetTypeId(); + switch (GetDataSlot(typeId)) { + case NUdf::EDataSlot::Int8: + return MakeFixedSizeArrowTraitsImpl<TTraits, i8>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + return MakeFixedSizeArrowTraitsImpl<TTraits, ui8>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Int16: + return MakeFixedSizeArrowTraitsImpl<TTraits, i16>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return MakeFixedSizeArrowTraitsImpl<TTraits, ui16>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Int32: + case NUdf::EDataSlot::Date32: + return MakeFixedSizeArrowTraitsImpl<TTraits, i32>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return MakeFixedSizeArrowTraitsImpl<TTraits, ui32>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + case NUdf::EDataSlot::Interval64: + case NUdf::EDataSlot::Datetime64: + case NUdf::EDataSlot::Timestamp64: + return MakeFixedSizeArrowTraitsImpl<TTraits, i64>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return MakeFixedSizeArrowTraitsImpl<TTraits, ui64>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Float: + return MakeFixedSizeArrowTraitsImpl<TTraits, float>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Double: + return MakeFixedSizeArrowTraitsImpl<TTraits, double>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::String: + return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Yson: + return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::JsonDocument: + return MakeStringArrowTraitsImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Utf8: + return MakeStringArrowTraitsImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Json: + return MakeStringArrowTraitsImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzDate: + return MakeTzDateArrowTraitsImpl<TTraits, TTzDate>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzDatetime: + return MakeTzDateArrowTraitsImpl<TTraits, TTzDatetime>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzTimestamp: + return MakeTzDateArrowTraitsImpl<TTraits, TTzTimestamp>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzDate32: + return MakeTzDateArrowTraitsImpl<TTraits, TTzDate32>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzDatetime64: + return MakeTzDateArrowTraitsImpl<TTraits, TTzDatetime64>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::TzTimestamp64: + return MakeTzDateArrowTraitsImpl<TTraits, TTzTimestamp64>(isOptional, type, std::forward<TArgs>(args)...); + case NUdf::EDataSlot::Decimal: { + if constexpr (CanInstantiateArrowTraitsForDecimal<TTraits>) { + return MakeFixedSizeArrowTraitsImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, type, std::forward<TArgs>(args)...); + } else { + Y_ENSURE(false, "Unsupported data slot"); + } + } + case NUdf::EDataSlot::Uuid: + case NUdf::EDataSlot::DyNumber: + Y_ENSURE(false, "Unsupported data slot"); + } + } + + TResourceTypeInspector resource(typeInfoHelper, type); + if (resource) { + if constexpr (TTraits::PassType) { + return TTraits::MakeResource(isOptional, type, std::forward<TArgs>(args)...); + } else { + return TTraits::MakeResource(isOptional, std::forward<TArgs>(args)...); + } + } + + TPgTypeInspector typePg(typeInfoHelper, type); + if (typePg) { + auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); + if constexpr (TTraits::PassType) { + return TTraits::MakePg(*desc, pgBuilder, type, std::forward<TArgs>(args)...); + } else { + return TTraits::MakePg(*desc, pgBuilder, std::forward<TArgs>(args)...); + } + } + + Y_ENSURE(false, "Unsupported type"); +} + +} +} diff --git a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp index 87fc63ece2..bbb4c134c8 100644 --- a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp +++ b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp @@ -1,6 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <yql/essentials/public/udf/arrow/block_builder.h> +#include <yql/essentials/public/udf/arrow/block_reader.h> #include <yql/essentials/public/udf/arrow/memory_pool.h> #include <yql/essentials/minikql/mkql_type_builder.h> #include <yql/essentials/minikql/mkql_function_registry.h> @@ -32,7 +33,7 @@ struct TArrayBuilderTestData { }; std::unique_ptr<IArrayBuilder> MakeResourceArrayBuilder(TType* resourceType, TArrayBuilderTestData& data) { - auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, + auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */nullptr); UNIT_ASSERT_C(arrayBuilder, "Failed to make resource arrow array builder"); return arrayBuilder; @@ -51,7 +52,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { auto value = datum.array()->GetValues<TUnboxedValue>(1)[0]; UNIT_ASSERT(value.IsEmbedded()); - UNIT_ASSERT_VALUES_EQUAL_C(TStringRef(value.AsStringRef()), TStringRef(resource.AsStringRef()), + UNIT_ASSERT_VALUES_EQUAL_C(TStringRef(value.AsStringRef()), TStringRef(resource.AsStringRef()), "Expected equal values after building array"); } @@ -66,7 +67,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { struct TWithDtor { int Payload; std::shared_ptr<int> DestructorCallsCnt; - TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt): + TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt): Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) { } ~TWithDtor() { @@ -86,7 +87,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_VALUES_EQUAL(datum.length(), 1); const auto value = datum.array()->GetValues<TUnboxedValuePod>(1)[0]; - auto boxed = value.AsBoxed().Get(); + auto boxed = value.AsBoxed().Get(); const auto resource = reinterpret_cast<TTestResource*>(boxed); UNIT_ASSERT_VALUES_EQUAL(resource->Get()->get()->Payload, payload); } @@ -110,7 +111,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { } else { arrayBuilder->Add(TUnboxedValuePod{}); } - } + } auto datum = arrayBuilder->Build(true); const auto blockReader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType); for (int i = 0; i < 4; i++) { @@ -125,7 +126,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { } } } - + Y_UNIT_TEST(TestBuilderWithReader) { TArrayBuilderTestData data; const auto resourceType = data.PgmBuilder.NewResourceType("Test.Resource"); @@ -147,7 +148,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_C(std::memcmp(item1.GetRawPtr(), item1AfterRead.GetRawPtr(), sizeof(TBlockItem)) == 0, "Expected UnboxedValue to equal to BlockItem"); UNIT_ASSERT_C(std::memcmp(item2.GetRawPtr(), item2AfterRead.GetRawPtr(), sizeof(TBlockItem)) == 0, "Expected UnboxedValue to equal to BlockItem"); } - + Y_UNIT_TEST(TestBoxedResourceReader) { TArrayBuilderTestData data; const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName); @@ -179,7 +180,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { Y_UNIT_TEST(TestTzDateBuilder_Layout) { TArrayBuilderTestData data; const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate); - const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType, + const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType, *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr); auto makeTzDate = [] (ui16 val, ui16 tz) { @@ -192,7 +193,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { for (auto date: dates) { arrayBuilder->Add(date); } - + const auto datum = arrayBuilder->Build(true); UNIT_ASSERT(datum.is_array()); UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size()); @@ -250,7 +251,7 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { const TBlockItem hugeItem = sItem2.MakeOptional(); - const size_t stringAllocStep = + const size_t stringAllocStep = arrow::BitUtil::RoundUpToMultipleOf64(blockLen + 1) + // String NullMask arrow::BitUtil::RoundUpToMultipleOf64((blockLen + 1) * 4) + // String Offsets NMiniKQL::MaxBlockSizeInBytes; // String Data @@ -283,4 +284,4 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { builder->Build(false); UNIT_ASSERT_VALUES_EQUAL(totalAllocated, initialAllocated); } -}
\ No newline at end of file +} diff --git a/yql/essentials/public/udf/arrow/ya.make b/yql/essentials/public/udf/arrow/ya.make index 4bdcdbe3c2..3928b63852 100644 --- a/yql/essentials/public/udf/arrow/ya.make +++ b/yql/essentials/public/udf/arrow/ya.make @@ -5,6 +5,7 @@ SRCS( udf_arrow_helpers.cpp bit_util.cpp util.cpp + block_builder.cpp block_reader.cpp block_item.cpp block_item_hasher.cpp diff --git a/yt/yql/providers/yt/codec/yt_arrow_converter.cpp b/yt/yql/providers/yt/codec/yt_arrow_converter.cpp index b36912cb67..7d85493324 100644 --- a/yt/yql/providers/yt/codec/yt_arrow_converter.cpp +++ b/yt/yql/providers/yt/codec/yt_arrow_converter.cpp @@ -3,7 +3,6 @@ #include <yql/essentials/public/udf/arrow/defs.h> #include <yql/essentials/public/udf/arrow/block_builder.h> -#include <yql/essentials/public/udf/arrow/block_reader.h> #include <yql/essentials/utils/yql_panic.h> #include <yql/essentials/minikql/mkql_node.h> #include <yql/essentials/minikql/mkql_type_builder.h> @@ -461,7 +460,10 @@ struct TComplexTypeYsonReaderTraits { using TStrings = TStringYsonReader<TStringType, Nullable, OriginalT, Native>; using TExtOptional = TExternalOptYsonReader<Native>; - static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* /* pgBuilder */) { + constexpr static bool PassType = false; + + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + Y_UNUSED(pgBuilder); return BuildPgYsonColumnReader(desc); } @@ -495,7 +497,7 @@ template<bool Native, bool IsTopOptional> class TComplexTypeYsonColumnConverter final : public IYtColumnConverter { public: TComplexTypeYsonColumnConverter(TYtColumnConverterSettings&& settings) : Settings_(std::move(settings)) { - Reader_ = NUdf::MakeBlockReaderImpl<TComplexTypeYsonReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder); + Reader_ = NUdf::DispatchByArrowTraits<TComplexTypeYsonReaderTraits<Native>>(TTypeInfoHelper(), settings.Type, settings.PgBuilder); } arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) { diff --git a/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp b/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp index c299f8d6ae..4f3c3b46cc 100644 --- a/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp +++ b/yt/yql/providers/yt/codec/yt_arrow_output_converter.cpp @@ -2,8 +2,11 @@ #include <yql/essentials/minikql/mkql_type_builder.h> #include <yql/essentials/minikql/mkql_node_cast.h> +#include <yql/essentials/public/udf/arrow/defs.h> #include <yql/essentials/utils/yql_panic.h> +#include <arrow/compute/cast.h> + namespace NYql { class TBasicOutputConverter : public IYtOutputColumnConverter { |