aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/public/udf/arrow
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-04 16:15:41 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-04 16:15:41 +0000
commitb21a377d1f5b24149cf65fd1f8feb44411ae38f9 (patch)
tree0459a651275d60cf60489d8142f20a8bd5e6a199 /yql/essentials/public/udf/arrow
parent827cd39b843ead1adfaa20f8a55e2e17da62a4eb (diff)
parent00325857a11f51ad6b43a4d35f57e85e06866ab6 (diff)
downloadydb-b21a377d1f5b24149cf65fd1f8feb44411ae38f9.tar.gz
Merge pull request #15307 from ydb-platform/merge-libs-250304-1328
Diffstat (limited to 'yql/essentials/public/udf/arrow')
-rw-r--r--yql/essentials/public/udf/arrow/block_builder.h53
-rw-r--r--yql/essentials/public/udf/arrow/block_item.h12
-rw-r--r--yql/essentials/public/udf/arrow/block_item_comparator.h18
-rw-r--r--yql/essentials/public/udf/arrow/block_item_hasher.h8
-rw-r--r--yql/essentials/public/udf/arrow/block_reader.h51
-rw-r--r--yql/essentials/public/udf/arrow/dispatch_traits.h18
-rw-r--r--yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp40
-rw-r--r--yql/essentials/public/udf/arrow/util.h15
8 files changed, 210 insertions, 5 deletions
diff --git a/yql/essentials/public/udf/arrow/block_builder.h b/yql/essentials/public/udf/arrow/block_builder.h
index 92f4f7e123..baac1842b9 100644
--- a/yql/essentials/public/udf/arrow/block_builder.h
+++ b/yql/essentials/public/udf/arrow/block_builder.h
@@ -10,6 +10,7 @@
#include <yql/essentials/public/udf/udf_value_builder.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <arrow/array/array_base.h>
#include <arrow/datum.h>
#include <arrow/c/bridge.h>
@@ -1358,6 +1359,53 @@ private:
std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
};
+class TSingularBlockBuilder final: public TArrayBuilderBase {
+public:
+ TSingularBlockBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
+ size_t maxLen, const TParams& params = {})
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params) {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ Y_UNUSED(value);
+ }
+
+ void DoAdd(TBlockItem value) final {
+ Y_UNUSED(value);
+ }
+
+ void DoAdd(TInputBuffer& input) final {
+ Y_UNUSED(input.PopChar());
+ }
+
+ void DoAddDefault() final {}
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_UNUSED(array, sparseBitmap, popCount);
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_UNUSED(array, beginIndex, count);
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_UNUSED(array, indexes, count);
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ Y_UNUSED(finish);
+ result->Payload.push_back(arrow::NullArray(GetCurrLen()).data());
+ return result;
+ }
+
+private:
+ size_t DoReserve() final {
+ return 0;
+ }
+};
+
using TArrayBuilderParams = TArrayBuilderBase::TParams;
struct TBuilderTraits {
@@ -1373,6 +1421,7 @@ struct TBuilderTraits {
using TResource = TResourceArrayBuilder<Nullable>;
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>;
+ using TSingular = TSingularBlockBuilder;
constexpr static bool PassType = true;
@@ -1412,6 +1461,10 @@ struct TBuilderTraits {
return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params);
}
}
+
+ static std::unique_ptr<TResult> MakeSingular(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
+ return std::make_unique<TSingular>(type, typeInfoHelper, pool, maxLen, params);
+ }
};
inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
diff --git a/yql/essentials/public/udf/arrow/block_item.h b/yql/essentials/public/udf/arrow/block_item.h
index 2f9784cd3c..79686b3094 100644
--- a/yql/essentials/public/udf/arrow/block_item.h
+++ b/yql/essentials/public/udf/arrow/block_item.h
@@ -166,6 +166,18 @@ public:
return &Raw;
}
+ static inline TBlockItem Void() {
+ TBlockItem v;
+ v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded);
+ return v;
+ }
+
+ static inline TBlockItem Zero() {
+ TBlockItem v;
+ v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded);
+ return v;
+ }
+
inline const void* GetRawPtr() const
{
return &Raw;
diff --git a/yql/essentials/public/udf/arrow/block_item_comparator.h b/yql/essentials/public/udf/arrow/block_item_comparator.h
index e185b63f66..ad803799c6 100644
--- a/yql/essentials/public/udf/arrow/block_item_comparator.h
+++ b/yql/essentials/public/udf/arrow/block_item_comparator.h
@@ -169,6 +169,24 @@ public:
}
};
+class TSingularTypeBlockItemComparator: public TBlockItemComparatorBase<TSingularTypeBlockItemComparator, /*Nullable=*/false> {
+public:
+ i64 DoCompare(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return 0;
+ }
+
+ bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return true;
+ }
+
+ bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return false;
+ }
+};
+
template<typename TTzType, bool Nullable>
class TTzDateBlockItemComparator : public TBlockItemComparatorBase<TTzDateBlockItemComparator<TTzType, Nullable>, Nullable> {
using TLayout = typename TDataType<TTzType>::TLayout;
diff --git a/yql/essentials/public/udf/arrow/block_item_hasher.h b/yql/essentials/public/udf/arrow/block_item_hasher.h
index 3f77e27b6f..9108d7b06e 100644
--- a/yql/essentials/public/udf/arrow/block_item_hasher.h
+++ b/yql/essentials/public/udf/arrow/block_item_hasher.h
@@ -76,6 +76,14 @@ public:
}
};
+class TSingularTypeBlockItemHaser : public TBlockItemHasherBase<TSingularTypeBlockItemHaser, /*Nullable=*/false> {
+public:
+ ui64 DoHash(TBlockItem value) const {
+ Y_UNUSED(value);
+ return 0;
+ }
+};
+
template <bool Nullable>
class TTupleBlockItemHasher : public TBlockItemHasherBase<TTupleBlockItemHasher<Nullable>, Nullable> {
public:
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h
index 05dd3ce440..6652df2ac6 100644
--- a/yql/essentials/public/udf/arrow/block_reader.h
+++ b/yql/essentials/public/udf/arrow/block_reader.h
@@ -424,6 +424,48 @@ private:
TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_;
};
+// NOTE: For any singular type we use arrow::null() data type.
+// This data type DOES NOT support bit mask so for optional type
+// we have to use |TExternalOptional| wrapper.
+class TSingularTypeBlockReader: public IBlockReader {
+public:
+ TSingularTypeBlockReader() = default;
+
+ ~TSingularTypeBlockReader() override = default;
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) override {
+ Y_UNUSED(data, index);
+ return TBlockItem::Zero();
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) override {
+ Y_UNUSED(scalar);
+ return TBlockItem::Zero();
+ }
+
+ ui64 GetDataWeight(const arrow::ArrayData& data) const override {
+ Y_UNUSED(data);
+ return 0;
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const override {
+ Y_UNUSED(item);
+ return 0;
+ }
+
+ ui64 GetDefaultValueWeight() const override {
+ return 0;
+ }
+
+ void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const override {
+ Y_UNUSED(index, data, out);
+ }
+
+ void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const override {
+ Y_UNUSED(scalar, out);
+ }
+};
+
class TExternalOptionalBlockReader final : public IBlockReader {
public:
TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
@@ -498,6 +540,7 @@ struct TReaderTraits {
using TResource = TResourceBlockReader<Nullable>;
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
+ using TSingularType = TSingularTypeBlockReader;
constexpr static bool PassType = false;
@@ -518,6 +561,10 @@ struct TReaderTraits {
}
}
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
+
template<typename TTzDate>
static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
if (isOptional) {
@@ -595,6 +642,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper,
return;
}
+ if (IsSingularType(typeInfoHelper, type)) {
+ return;
+ }
+
Y_ENSURE(false, "Unsupported type");
}
diff --git a/yql/essentials/public/udf/arrow/dispatch_traits.h b/yql/essentials/public/udf/arrow/dispatch_traits.h
index 88c303cc87..87c25b93f5 100644
--- a/yql/essentials/public/udf/arrow/dispatch_traits.h
+++ b/yql/essentials/public/udf/arrow/dispatch_traits.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
#include <yql/essentials/public/udf/udf_value_builder.h>
@@ -85,8 +86,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
- if (unpackedOpt || typeOpt && unpackedPg) {
- // at least 2 levels of optionals
+ if (unpackedOpt || (typeOpt && NeedWrapWithExternalOptional(typeInfoHelper, unpacked))) {
ui32 nestLevel = 0;
auto currentType = type;
auto previousType = type;
@@ -103,7 +103,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
}
- if (TPgTypeInspector(typeInfoHelper, currentType)) {
+ if (NeedWrapWithExternalOptional(typeInfoHelper, currentType)) {
previousType = currentType;
++nestLevel;
}
@@ -118,8 +118,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
return reader;
- }
- else {
+ } else {
type = unpacked;
}
@@ -230,6 +229,15 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
}
+ if (IsSingularType(typeInfoHelper, type)) {
+ Y_ENSURE(!isOptional, "Optional data types are not supported directly for singular type. Please use TExternalOptional wrapper.");
+ if constexpr (TTraits::PassType) {
+ return TTraits::MakeSingular(type, std::forward<TArgs>(args)...);
+ } else {
+ return TTraits::MakeSingular(std::forward<TArgs>(args)...);
+ }
+ }
+
Y_ENSURE(false, "Unsupported type");
}
diff --git a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
index bbb4c134c8..d0851c5e86 100644
--- a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
+++ b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
@@ -220,6 +220,46 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
UNIT_ASSERT_VALUES_EQUAL(item2AfterRead.GetStringRefFromValue(), "234");
}
+ Y_UNIT_TEST(TestSingularTypeValueBuilderReader) {
+ TArrayBuilderTestData data;
+ const auto nullType = data.PgmBuilder.NewNullType();
+
+ std::shared_ptr<arrow::ArrayData> arrayData = arrow::NullArray{42}.data();
+ IArrayBuilder::TArrayDataItem arrayDataItem = {.Data = arrayData.get(), .StartOffset = 0};
+ {
+ const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), nullType, *data.ArrowPool, MAX_BLOCK_SIZE, /*pgBuilder=*/nullptr);
+ // Check builder.
+ arrayBuilder->Add(TUnboxedValuePod::Zero());
+ arrayBuilder->Add(TBlockItem::Zero());
+ arrayBuilder->Add(TBlockItem::Zero(), 4);
+ TInputBuffer inputBuffer("Just arbitrary string");
+ arrayBuilder->Add(inputBuffer);
+ arrayBuilder->AddMany(*arrayData, /*popCount=*/3u, /*sparseBitmat=*/nullptr, /*bitmapSize=*/arrayData->length);
+ arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/1, /*count=*/3u);
+ std::vector<ui64> indexes = {1, 5, 7, 10};
+ arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/indexes.data(), /*count=*/4u);
+ UNIT_ASSERT_VALUES_EQUAL(arrayBuilder->Build(true).array()->length, 1 + 1 + 4 + 1 + 3 + 3 + 4);
+ }
+
+ {
+ // Check reader.
+ const auto blockReader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), nullType);
+
+ UNIT_ASSERT(blockReader->GetItem(*arrayData, 0));
+ UNIT_ASSERT(blockReader->GetScalarItem(arrow::Scalar(arrow::null())));
+ UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(*arrayData), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(TBlockItem::Zero()), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0);
+
+ TOutputBuffer outputBuffer;
+ blockReader->SaveItem(*arrayData, 1, outputBuffer);
+ UNIT_ASSERT(outputBuffer.Finish().empty());
+ blockReader->SaveScalarItem(arrow::Scalar(arrow::null()), outputBuffer);
+ UNIT_ASSERT(outputBuffer.Finish().empty());
+ }
+ }
+
Y_UNIT_TEST(TestBuilderAllocatedSize) {
TArrayBuilderTestData data;
const auto optStringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, true);
diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h
index f7bdb715f9..e899af26af 100644
--- a/yql/essentials/public/udf/arrow/util.h
+++ b/yql/essentials/public/udf/arrow/util.h
@@ -12,6 +12,9 @@
#include <functional>
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <yql/essentials/public/udf/udf_types.h>
+
namespace NYql {
namespace NUdf {
@@ -236,5 +239,17 @@ inline void ZeroMemoryContext(void* ptr) {
SetMemoryContext(ptr, nullptr);
}
+inline bool IsSingularType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ auto kind = typeInfoHelper.GetTypeKind(type);
+ return kind == ETypeKind::Null ||
+ kind == ETypeKind::Void ||
+ kind == ETypeKind::EmptyDict ||
+ kind == ETypeKind::EmptyList;
+}
+
+inline bool NeedWrapWithExternalOptional(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ return TPgTypeInspector(typeInfoHelper, type) || IsSingularType(typeInfoHelper, type);
+}
+
} // namespace NUdf
} // namespace NYql