diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-03-04 16:15:41 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-03-04 16:15:41 +0000 |
commit | b21a377d1f5b24149cf65fd1f8feb44411ae38f9 (patch) | |
tree | 0459a651275d60cf60489d8142f20a8bd5e6a199 /yql/essentials/public/udf/arrow | |
parent | 827cd39b843ead1adfaa20f8a55e2e17da62a4eb (diff) | |
parent | 00325857a11f51ad6b43a4d35f57e85e06866ab6 (diff) | |
download | ydb-b21a377d1f5b24149cf65fd1f8feb44411ae38f9.tar.gz |
Merge pull request #15307 from ydb-platform/merge-libs-250304-1328
Diffstat (limited to 'yql/essentials/public/udf/arrow')
-rw-r--r-- | yql/essentials/public/udf/arrow/block_builder.h | 53 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/block_item.h | 12 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/block_item_comparator.h | 18 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/block_item_hasher.h | 8 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/block_reader.h | 51 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/dispatch_traits.h | 18 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp | 40 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/util.h | 15 |
8 files changed, 210 insertions, 5 deletions
diff --git a/yql/essentials/public/udf/arrow/block_builder.h b/yql/essentials/public/udf/arrow/block_builder.h index 92f4f7e123..baac1842b9 100644 --- a/yql/essentials/public/udf/arrow/block_builder.h +++ b/yql/essentials/public/udf/arrow/block_builder.h @@ -10,6 +10,7 @@ #include <yql/essentials/public/udf/udf_value_builder.h> #include <yql/essentials/public/udf/udf_type_inspection.h> +#include <arrow/array/array_base.h> #include <arrow/datum.h> #include <arrow/c/bridge.h> @@ -1358,6 +1359,53 @@ private: std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; }; +class TSingularBlockBuilder final: public TArrayBuilderBase { +public: + TSingularBlockBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, + size_t maxLen, const TParams& params = {}) + : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params) { + Reserve(); + } + + void DoAdd(NUdf::TUnboxedValuePod value) final { + Y_UNUSED(value); + } + + void DoAdd(TBlockItem value) final { + Y_UNUSED(value); + } + + void DoAdd(TInputBuffer& input) final { + Y_UNUSED(input.PopChar()); + } + + void DoAddDefault() final {} + + void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { + Y_UNUSED(array, sparseBitmap, popCount); + } + + void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { + Y_UNUSED(array, beginIndex, count); + } + + void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { + Y_UNUSED(array, indexes, count); + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { + TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); + Y_UNUSED(finish); + result->Payload.push_back(arrow::NullArray(GetCurrLen()).data()); + return result; + } + +private: + size_t DoReserve() final { + return 0; + } +}; + using TArrayBuilderParams = TArrayBuilderBase::TParams; struct TBuilderTraits { @@ -1373,6 +1421,7 @@ struct TBuilderTraits { using TResource = TResourceArrayBuilder<Nullable>; template<typename TTzDate, bool Nullable> using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>; + using TSingular = TSingularBlockBuilder; constexpr static bool PassType = true; @@ -1412,6 +1461,10 @@ struct TBuilderTraits { return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params); } } + + static std::unique_ptr<TResult> MakeSingular(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) { + return std::make_unique<TSingular>(type, typeInfoHelper, pool, maxLen, params); + } }; inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder( diff --git a/yql/essentials/public/udf/arrow/block_item.h b/yql/essentials/public/udf/arrow/block_item.h index 2f9784cd3c..79686b3094 100644 --- a/yql/essentials/public/udf/arrow/block_item.h +++ b/yql/essentials/public/udf/arrow/block_item.h @@ -166,6 +166,18 @@ public: return &Raw; } + static inline TBlockItem Void() { + TBlockItem v; + v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded); + return v; + } + + static inline TBlockItem Zero() { + TBlockItem v; + v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded); + return v; + } + inline const void* GetRawPtr() const { return &Raw; diff --git a/yql/essentials/public/udf/arrow/block_item_comparator.h b/yql/essentials/public/udf/arrow/block_item_comparator.h index e185b63f66..ad803799c6 100644 --- a/yql/essentials/public/udf/arrow/block_item_comparator.h +++ b/yql/essentials/public/udf/arrow/block_item_comparator.h @@ -169,6 +169,24 @@ public: } }; +class TSingularTypeBlockItemComparator: public TBlockItemComparatorBase<TSingularTypeBlockItemComparator, /*Nullable=*/false> { +public: + i64 DoCompare(TBlockItem lhs, TBlockItem rhs) const { + Y_UNUSED(lhs, rhs); + return 0; + } + + bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { + Y_UNUSED(lhs, rhs); + return true; + } + + bool DoLess(TBlockItem lhs, TBlockItem rhs) const { + Y_UNUSED(lhs, rhs); + return false; + } +}; + template<typename TTzType, bool Nullable> class TTzDateBlockItemComparator : public TBlockItemComparatorBase<TTzDateBlockItemComparator<TTzType, Nullable>, Nullable> { using TLayout = typename TDataType<TTzType>::TLayout; diff --git a/yql/essentials/public/udf/arrow/block_item_hasher.h b/yql/essentials/public/udf/arrow/block_item_hasher.h index 3f77e27b6f..9108d7b06e 100644 --- a/yql/essentials/public/udf/arrow/block_item_hasher.h +++ b/yql/essentials/public/udf/arrow/block_item_hasher.h @@ -76,6 +76,14 @@ public: } }; +class TSingularTypeBlockItemHaser : public TBlockItemHasherBase<TSingularTypeBlockItemHaser, /*Nullable=*/false> { +public: + ui64 DoHash(TBlockItem value) const { + Y_UNUSED(value); + return 0; + } +}; + template <bool Nullable> class TTupleBlockItemHasher : public TBlockItemHasherBase<TTupleBlockItemHasher<Nullable>, Nullable> { public: diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h index 05dd3ce440..6652df2ac6 100644 --- a/yql/essentials/public/udf/arrow/block_reader.h +++ b/yql/essentials/public/udf/arrow/block_reader.h @@ -424,6 +424,48 @@ private: TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_; }; +// NOTE: For any singular type we use arrow::null() data type. +// This data type DOES NOT support bit mask so for optional type +// we have to use |TExternalOptional| wrapper. +class TSingularTypeBlockReader: public IBlockReader { +public: + TSingularTypeBlockReader() = default; + + ~TSingularTypeBlockReader() override = default; + + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) override { + Y_UNUSED(data, index); + return TBlockItem::Zero(); + } + + TBlockItem GetScalarItem(const arrow::Scalar& scalar) override { + Y_UNUSED(scalar); + return TBlockItem::Zero(); + } + + ui64 GetDataWeight(const arrow::ArrayData& data) const override { + Y_UNUSED(data); + return 0; + } + + ui64 GetDataWeight(TBlockItem item) const override { + Y_UNUSED(item); + return 0; + } + + ui64 GetDefaultValueWeight() const override { + return 0; + } + + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const override { + Y_UNUSED(index, data, out); + } + + void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const override { + Y_UNUSED(scalar, out); + } +}; + class TExternalOptionalBlockReader final : public IBlockReader { public: TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) @@ -498,6 +540,7 @@ struct TReaderTraits { using TResource = TResourceBlockReader<Nullable>; template<typename TTzDate, bool Nullable> using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>; + using TSingularType = TSingularTypeBlockReader; constexpr static bool PassType = false; @@ -518,6 +561,10 @@ struct TReaderTraits { } } + static std::unique_ptr<TResult> MakeSingular() { + return std::make_unique<TSingularType>(); + } + template<typename TTzDate> static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { if (isOptional) { @@ -595,6 +642,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, return; } + if (IsSingularType(typeInfoHelper, type)) { + return; + } + Y_ENSURE(false, "Unsupported type"); } diff --git a/yql/essentials/public/udf/arrow/dispatch_traits.h b/yql/essentials/public/udf/arrow/dispatch_traits.h index 88c303cc87..87c25b93f5 100644 --- a/yql/essentials/public/udf/arrow/dispatch_traits.h +++ b/yql/essentials/public/udf/arrow/dispatch_traits.h @@ -1,5 +1,6 @@ #pragma once +#include <yql/essentials/public/udf/arrow/util.h> #include <yql/essentials/public/udf/udf_type_inspection.h> #include <yql/essentials/public/udf/udf_value_builder.h> @@ -85,8 +86,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked); TPgTypeInspector unpackedPg(typeInfoHelper, unpacked); - if (unpackedOpt || typeOpt && unpackedPg) { - // at least 2 levels of optionals + if (unpackedOpt || (typeOpt && NeedWrapWithExternalOptional(typeInfoHelper, unpacked))) { ui32 nestLevel = 0; auto currentType = type; auto previousType = type; @@ -103,7 +103,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo } } - if (TPgTypeInspector(typeInfoHelper, currentType)) { + if (NeedWrapWithExternalOptional(typeInfoHelper, currentType)) { previousType = currentType; ++nestLevel; } @@ -118,8 +118,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo } return reader; - } - else { + } else { type = unpacked; } @@ -230,6 +229,15 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo } } + if (IsSingularType(typeInfoHelper, type)) { + Y_ENSURE(!isOptional, "Optional data types are not supported directly for singular type. Please use TExternalOptional wrapper."); + if constexpr (TTraits::PassType) { + return TTraits::MakeSingular(type, std::forward<TArgs>(args)...); + } else { + return TTraits::MakeSingular(std::forward<TArgs>(args)...); + } + } + Y_ENSURE(false, "Unsupported type"); } diff --git a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp index bbb4c134c8..d0851c5e86 100644 --- a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp +++ b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp @@ -220,6 +220,46 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_VALUES_EQUAL(item2AfterRead.GetStringRefFromValue(), "234"); } + Y_UNIT_TEST(TestSingularTypeValueBuilderReader) { + TArrayBuilderTestData data; + const auto nullType = data.PgmBuilder.NewNullType(); + + std::shared_ptr<arrow::ArrayData> arrayData = arrow::NullArray{42}.data(); + IArrayBuilder::TArrayDataItem arrayDataItem = {.Data = arrayData.get(), .StartOffset = 0}; + { + const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), nullType, *data.ArrowPool, MAX_BLOCK_SIZE, /*pgBuilder=*/nullptr); + // Check builder. + arrayBuilder->Add(TUnboxedValuePod::Zero()); + arrayBuilder->Add(TBlockItem::Zero()); + arrayBuilder->Add(TBlockItem::Zero(), 4); + TInputBuffer inputBuffer("Just arbitrary string"); + arrayBuilder->Add(inputBuffer); + arrayBuilder->AddMany(*arrayData, /*popCount=*/3u, /*sparseBitmat=*/nullptr, /*bitmapSize=*/arrayData->length); + arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/1, /*count=*/3u); + std::vector<ui64> indexes = {1, 5, 7, 10}; + arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/indexes.data(), /*count=*/4u); + UNIT_ASSERT_VALUES_EQUAL(arrayBuilder->Build(true).array()->length, 1 + 1 + 4 + 1 + 3 + 3 + 4); + } + + { + // Check reader. + const auto blockReader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), nullType); + + UNIT_ASSERT(blockReader->GetItem(*arrayData, 0)); + UNIT_ASSERT(blockReader->GetScalarItem(arrow::Scalar(arrow::null()))); + UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(*arrayData), 0); + UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(TBlockItem::Zero()), 0); + UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0); + UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0); + + TOutputBuffer outputBuffer; + blockReader->SaveItem(*arrayData, 1, outputBuffer); + UNIT_ASSERT(outputBuffer.Finish().empty()); + blockReader->SaveScalarItem(arrow::Scalar(arrow::null()), outputBuffer); + UNIT_ASSERT(outputBuffer.Finish().empty()); + } + } + Y_UNIT_TEST(TestBuilderAllocatedSize) { TArrayBuilderTestData data; const auto optStringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, true); diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h index f7bdb715f9..e899af26af 100644 --- a/yql/essentials/public/udf/arrow/util.h +++ b/yql/essentials/public/udf/arrow/util.h @@ -12,6 +12,9 @@ #include <functional> +#include <yql/essentials/public/udf/udf_type_inspection.h> +#include <yql/essentials/public/udf/udf_types.h> + namespace NYql { namespace NUdf { @@ -236,5 +239,17 @@ inline void ZeroMemoryContext(void* ptr) { SetMemoryContext(ptr, nullptr); } +inline bool IsSingularType(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + auto kind = typeInfoHelper.GetTypeKind(type); + return kind == ETypeKind::Null || + kind == ETypeKind::Void || + kind == ETypeKind::EmptyDict || + kind == ETypeKind::EmptyList; +} + +inline bool NeedWrapWithExternalOptional(const ITypeInfoHelper& typeInfoHelper, const TType* type) { + return TPgTypeInspector(typeInfoHelper, type) || IsSingularType(typeInfoHelper, type); +} + } // namespace NUdf } // namespace NYql |