diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2025-03-20 20:49:43 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2025-03-20 21:12:37 +0300 |
commit | 74e00a576ebbd4d83e0d2c7ac1a4be5466cfd042 (patch) | |
tree | 9e0c4648b6bd6a61a1a8795e791068b59eb45cda | |
parent | 2240a7baf6aedfac97f955f5a998c34d135afc43 (diff) | |
download | ydb-74e00a576ebbd4d83e0d2c7ac1a4be5466cfd042.tar.gz |
YQL supported LogicalDataSize in block reader
Supported LogicalDataSize in block reader
commit_hash:db90c1c9c8065e4e1f41ec0fd9cfb7bc992efce2
-rw-r--r-- | yql/essentials/public/udf/arrow/block_reader.h | 112 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp | 133 | ||||
-rw-r--r-- | yql/essentials/public/udf/arrow/ut/ya.make | 1 |
3 files changed, 228 insertions, 18 deletions
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h index 6652df2ac67..dc3e7be87b3 100644 --- a/yql/essentials/public/udf/arrow/block_reader.h +++ b/yql/essentials/public/udf/arrow/block_reader.h @@ -20,6 +20,7 @@ public: virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0; + virtual ui64 GetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const = 0; virtual ui64 GetDataWeight(TBlockItem item) const = 0; virtual ui64 GetDefaultValueWeight() const = 0; @@ -33,8 +34,28 @@ struct TBlockItemSerializeProps { bool IsFixed = true; // true if each block item takes fixed size }; +class TBlockReaderBase : public IBlockReader { +public: + ui64 GetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + Y_ENSURE(0 <= offset && offset < data.length); + Y_ENSURE(offset + length >= offset); + Y_ENSURE(offset + length <= data.length); + return DoGetSliceDataWeight(data, offset, length); + } + +protected: + virtual ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const = 0; + + static ui64 GetBitmaskDataWeight(int64_t dataLength) { + if (dataLength <= 0) { + return 0; + } + return (dataLength - 1) / 8 + 1; + } +}; + template<typename T, bool Nullable, typename TDerived> -class TFixedSizeBlockReaderBase : public IBlockReader { +class TFixedSizeBlockReaderBase : public TBlockReaderBase { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { @@ -66,10 +87,12 @@ public: } ui64 GetDataWeight(const arrow::ArrayData& data) const final { - if constexpr (Nullable) { - return (1 + sizeof(T)) * data.length; - } - return sizeof(T) * data.length; + return GetDataWeightImpl(data.length); + } + + ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + Y_UNUSED(data, offset); + return GetDataWeightImpl(length); } ui64 GetDataWeight(TBlockItem item) const final { @@ -111,6 +134,15 @@ public: out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); } } + +private: + ui64 GetDataWeightImpl(int64_t dataLength) const { + ui64 size = sizeof(T) * dataLength; + if constexpr (Nullable) { + size += GetBitmaskDataWeight(dataLength); + } + return size; + } }; template<typename T, bool Nullable> @@ -132,7 +164,7 @@ public: }; template<typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal = NKikimr::NUdf::EDataSlot::String> -class TStringBlockReader final : public IBlockReader { +class TStringBlockReader final : public TBlockReaderBase { public: using TOffset = typename TStringType::offset_type; @@ -164,12 +196,11 @@ public: } ui64 GetDataWeight(const arrow::ArrayData& data) const final { - ui64 size = 0; - if constexpr (Nullable) { - size += data.length; - } - size += data.buffers[2] ? data.buffers[2]->size() : 0; - return size; + return GetDataWeightImpl(data.length, data.GetValues<TOffset>(1)); + } + + ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + return GetDataWeightImpl(length, data.GetValues<TOffset>(1, offset)); } ui64 GetDataWeight(TBlockItem item) const final { @@ -214,10 +245,21 @@ public: std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size()); out.PushString(str); } + +private: + ui64 GetDataWeightImpl(int64_t dataLength, const TOffset* offsets) const { + ui64 size = 0; + if constexpr (Nullable) { + size += GetBitmaskDataWeight(dataLength); + } + size += offsets[dataLength] - offsets[0]; + size += sizeof(TOffset) * dataLength; + return size; + } }; template<bool Nullable, typename TDerived> -class TTupleBlockReaderBase : public IBlockReader { +class TTupleBlockReaderBase : public TBlockReaderBase { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { @@ -242,13 +284,21 @@ public: ui64 GetDataWeight(const arrow::ArrayData& data) const final { ui64 size = 0; if constexpr (Nullable) { - size += data.length; + size += GetBitmaskDataWeight(data.length); } - size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data); return size; } + ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += GetBitmaskDataWeight(length); + } + size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data, offset, length); + return size; + } + ui64 GetDataWeight(TBlockItem item) const final { return static_cast<const TDerived*>(this)->GetDataWeightImpl(item); } @@ -340,6 +390,14 @@ public: return size; } + size_t GetChildrenDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const { + size_t size = 0; + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetSliceDataWeight(*data.child_data[i], offset, length); + } + return size; + } + size_t GetChildrenDefaultDataWeight() const { size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { @@ -393,6 +451,15 @@ public: return size; } + size_t GetChildrenDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + size_t size = 0; + size += DateReader_.GetSliceDataWeight(*data.child_data[0], offset, length); + size += TimezoneReader_.GetSliceDataWeight(*data.child_data[1], offset, length); + return size; + } + size_t GetDataWeightImpl(const TBlockItem& item) const { Y_UNUSED(item); return GetChildrenDefaultDataWeight(); @@ -427,7 +494,7 @@ private: // NOTE: For any singular type we use arrow::null() data type. // This data type DOES NOT support bit mask so for optional type // we have to use |TExternalOptional| wrapper. -class TSingularTypeBlockReader: public IBlockReader { +class TSingularTypeBlockReader: public TBlockReaderBase { public: TSingularTypeBlockReader() = default; @@ -448,6 +515,11 @@ public: return 0; } + ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + Y_UNUSED(data, offset, length); + return 0; + } + ui64 GetDataWeight(TBlockItem item) const override { Y_UNUSED(item); return 0; @@ -466,7 +538,7 @@ public: } }; -class TExternalOptionalBlockReader final : public IBlockReader { +class TExternalOptionalBlockReader final : public TBlockReaderBase { public: TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) : Inner(std::move(inner)) @@ -490,7 +562,11 @@ public: } ui64 GetDataWeight(const arrow::ArrayData& data) const final { - return data.length + Inner->GetDataWeight(*data.child_data.front()); + return GetBitmaskDataWeight(data.length) + Inner->GetDataWeight(*data.child_data.front()); + } + + ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final { + return GetBitmaskDataWeight(length) + Inner->GetSliceDataWeight(*data.child_data.front(), offset, length); } ui64 GetDataWeight(TBlockItem item) const final { diff --git a/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp b/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp new file mode 100644 index 00000000000..1ac4c88d65e --- /dev/null +++ b/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp @@ -0,0 +1,133 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/public/udf/arrow/block_builder.h> +#include <yql/essentials/public/udf/arrow/block_reader.h> +#include <yql/essentials/public/udf/arrow/memory_pool.h> + +namespace NYql::NUdf { + +namespace { + +using namespace NKikimr; + +class TBlockReaderFixture : public NUnitTest::TBaseFixture { + class TArrayHelpers : public TThrRefBase { + public: + using TPtr = TIntrusivePtr<TArrayHelpers>; + + explicit TArrayHelpers(const NMiniKQL::TType* type, arrow::MemoryPool* const arrowPool) + : Builder(MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), type, *arrowPool, NMiniKQL::CalcBlockLen(CalcMaxBlockItemSize(type)), nullptr)) + , Reader(MakeBlockReader(NMiniKQL::TTypeInfoHelper(), type)) + {} + + public: + const std::unique_ptr<IArrayBuilder> Builder; + const std::unique_ptr<IBlockReader> Reader; + }; + +public: + TBlockReaderFixture() + : FunctionRegistry(CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry())) + , Alloc(__LOCATION__) + , Env(Alloc) + , PgmBuilder(Env, *FunctionRegistry) + , ArrowPool(GetYqlMemoryPool()) + {} + + NMiniKQL::TType* OptionaType(NMiniKQL::TType* type) const { + return PgmBuilder.NewOptionalType(type); + } + + template <typename T> + NMiniKQL::TType* DataType() const { + return PgmBuilder.NewDataType(NUdf::TDataType<T>::Id); + } + + NMiniKQL::TType* DataType(NUdf::EDataSlot dataSlot) const { + return PgmBuilder.NewDataType(dataSlot); + } + + template <typename... TArgs> + NMiniKQL::TType* TupleType(TArgs&&... args) const { + return PgmBuilder.NewTupleType({std::forward<TArgs>(args)...}); + } + + TArrayHelpers::TPtr GetArrayHelpers(const NMiniKQL::TType* type) const { + return MakeIntrusive<TArrayHelpers>(type, ArrowPool); + } + +public: + TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry; + NMiniKQL::TScopedAlloc Alloc; + NMiniKQL::TTypeEnvironment Env; + NMiniKQL::TProgramBuilder PgmBuilder; + arrow::MemoryPool* const ArrowPool; +}; + +} // anonymous namespace + +Y_UNIT_TEST_SUITE(BlockReaderTest) { + Y_UNIT_TEST_F(TestLogicalDataSize, TBlockReaderFixture) { + const std::vector arrayHelpers = { + GetArrayHelpers(DataType<ui32>()), + GetArrayHelpers(OptionaType(DataType<char*>())), + GetArrayHelpers(OptionaType(TupleType(OptionaType(DataType<ui32>()), DataType<char*>()))), + GetArrayHelpers(DataType(NUdf::EDataSlot::TzDate)), + GetArrayHelpers(PgmBuilder.NewNullType()) + }; + + constexpr ui32 size = 1000; + constexpr ui32 stringSize = 37; + for (ui32 i = 0; i < size; ++i) { + arrayHelpers[0]->Builder->Add(TBlockItem(i)); + + const auto str = NUnitTest::RandomString(stringSize, i); + arrayHelpers[1]->Builder->Add((i % 2) ? TBlockItem(str) : TBlockItem()); + + TBlockItem tuple[] = { ((i / 2) % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(str) }; + arrayHelpers[2]->Builder->Add((i % 2) ? TBlockItem(tuple) : TBlockItem()); + + TBlockItem tzDate(i); + tzDate.SetTimezoneId(i % 100); + arrayHelpers[3]->Builder->Add(tzDate); + arrayHelpers[4]->Builder->Add(TBlockItem::Zero()); + } + + std::vector<std::shared_ptr<arrow::ArrayData>> arrays; + arrays.reserve(arrayHelpers.size()); + for (const auto& helper : arrayHelpers) { + arrays.emplace_back(helper->Builder->Build(true).array()); + } + + constexpr ui32 offset = 133; + constexpr ui32 len = 533; + static_assert(offset + len < size); + + constexpr ui64 offsetSize = sizeof(arrow::BinaryType::offset_type) * len; + constexpr ui64 bitmaskSize = (len - 1) / 8 + 1; + constexpr ui64 nonEmptyStrings = (len - offset % 2) / 2 + offset % 2; + const std::vector<ui64> expectedLogicalSize = { + sizeof(ui32) * len, + bitmaskSize + offsetSize + stringSize * nonEmptyStrings, + 2 * bitmaskSize + offsetSize + sizeof(ui32) * len + stringSize * nonEmptyStrings, + (sizeof(ui16) + sizeof(ui16)) * len, + 0 + }; + + // Test GetDataWeight with offset and length + for (ui32 i = 0; i < arrayHelpers.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL_C(arrayHelpers[i]->Reader->GetSliceDataWeight(*arrays[i], offset, len), expectedLogicalSize[i], "array: " << i); + } + + // Test GetDataWeight after slize + for (ui32 i = 0; i < arrayHelpers.size(); ++i) { + const auto slice = DeepSlice(arrays[i], offset, len); + UNIT_ASSERT_VALUES_EQUAL_C(arrayHelpers[i]->Reader->GetDataWeight(*slice), expectedLogicalSize[i], "sliced array: " << i); + } + } +} + +} // namespace NYql::NUdf diff --git a/yql/essentials/public/udf/arrow/ut/ya.make b/yql/essentials/public/udf/arrow/ut/ya.make index cf4e1a802a0..037deb20fc8 100644 --- a/yql/essentials/public/udf/arrow/ut/ya.make +++ b/yql/essentials/public/udf/arrow/ut/ya.make @@ -3,6 +3,7 @@ UNITTEST() SRCS( array_builder_ut.cpp bit_util_ut.cpp + block_reader_ut.cpp ) PEERDIR( |