aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2025-03-20 20:49:43 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2025-03-20 21:12:37 +0300
commit74e00a576ebbd4d83e0d2c7ac1a4be5466cfd042 (patch)
tree9e0c4648b6bd6a61a1a8795e791068b59eb45cda
parent2240a7baf6aedfac97f955f5a998c34d135afc43 (diff)
downloadydb-74e00a576ebbd4d83e0d2c7ac1a4be5466cfd042.tar.gz
YQL supported LogicalDataSize in block reader
Supported LogicalDataSize in block reader commit_hash:db90c1c9c8065e4e1f41ec0fd9cfb7bc992efce2
-rw-r--r--yql/essentials/public/udf/arrow/block_reader.h112
-rw-r--r--yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp133
-rw-r--r--yql/essentials/public/udf/arrow/ut/ya.make1
3 files changed, 228 insertions, 18 deletions
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h
index 6652df2ac67..dc3e7be87b3 100644
--- a/yql/essentials/public/udf/arrow/block_reader.h
+++ b/yql/essentials/public/udf/arrow/block_reader.h
@@ -20,6 +20,7 @@ public:
virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
virtual ui64 GetDataWeight(const arrow::ArrayData& data) const = 0;
+ virtual ui64 GetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const = 0;
virtual ui64 GetDataWeight(TBlockItem item) const = 0;
virtual ui64 GetDefaultValueWeight() const = 0;
@@ -33,8 +34,28 @@ struct TBlockItemSerializeProps {
bool IsFixed = true; // true if each block item takes fixed size
};
+class TBlockReaderBase : public IBlockReader {
+public:
+ ui64 GetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ Y_ENSURE(0 <= offset && offset < data.length);
+ Y_ENSURE(offset + length >= offset);
+ Y_ENSURE(offset + length <= data.length);
+ return DoGetSliceDataWeight(data, offset, length);
+ }
+
+protected:
+ virtual ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const = 0;
+
+ static ui64 GetBitmaskDataWeight(int64_t dataLength) {
+ if (dataLength <= 0) {
+ return 0;
+ }
+ return (dataLength - 1) / 8 + 1;
+ }
+};
+
template<typename T, bool Nullable, typename TDerived>
-class TFixedSizeBlockReaderBase : public IBlockReader {
+class TFixedSizeBlockReaderBase : public TBlockReaderBase {
public:
TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
if constexpr (Nullable) {
@@ -66,10 +87,12 @@ public:
}
ui64 GetDataWeight(const arrow::ArrayData& data) const final {
- if constexpr (Nullable) {
- return (1 + sizeof(T)) * data.length;
- }
- return sizeof(T) * data.length;
+ return GetDataWeightImpl(data.length);
+ }
+
+ ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ Y_UNUSED(data, offset);
+ return GetDataWeightImpl(length);
}
ui64 GetDataWeight(TBlockItem item) const final {
@@ -111,6 +134,15 @@ public:
out.PushNumber(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
}
}
+
+private:
+ ui64 GetDataWeightImpl(int64_t dataLength) const {
+ ui64 size = sizeof(T) * dataLength;
+ if constexpr (Nullable) {
+ size += GetBitmaskDataWeight(dataLength);
+ }
+ return size;
+ }
};
template<typename T, bool Nullable>
@@ -132,7 +164,7 @@ public:
};
template<typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot TOriginal = NKikimr::NUdf::EDataSlot::String>
-class TStringBlockReader final : public IBlockReader {
+class TStringBlockReader final : public TBlockReaderBase {
public:
using TOffset = typename TStringType::offset_type;
@@ -164,12 +196,11 @@ public:
}
ui64 GetDataWeight(const arrow::ArrayData& data) const final {
- ui64 size = 0;
- if constexpr (Nullable) {
- size += data.length;
- }
- size += data.buffers[2] ? data.buffers[2]->size() : 0;
- return size;
+ return GetDataWeightImpl(data.length, data.GetValues<TOffset>(1));
+ }
+
+ ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ return GetDataWeightImpl(length, data.GetValues<TOffset>(1, offset));
}
ui64 GetDataWeight(TBlockItem item) const final {
@@ -214,10 +245,21 @@ public:
std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
out.PushString(str);
}
+
+private:
+ ui64 GetDataWeightImpl(int64_t dataLength, const TOffset* offsets) const {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size += GetBitmaskDataWeight(dataLength);
+ }
+ size += offsets[dataLength] - offsets[0];
+ size += sizeof(TOffset) * dataLength;
+ return size;
+ }
};
template<bool Nullable, typename TDerived>
-class TTupleBlockReaderBase : public IBlockReader {
+class TTupleBlockReaderBase : public TBlockReaderBase {
public:
TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
if constexpr (Nullable) {
@@ -242,13 +284,21 @@ public:
ui64 GetDataWeight(const arrow::ArrayData& data) const final {
ui64 size = 0;
if constexpr (Nullable) {
- size += data.length;
+ size += GetBitmaskDataWeight(data.length);
}
-
size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data);
return size;
}
+ ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size += GetBitmaskDataWeight(length);
+ }
+ size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data, offset, length);
+ return size;
+ }
+
ui64 GetDataWeight(TBlockItem item) const final {
return static_cast<const TDerived*>(this)->GetDataWeightImpl(item);
}
@@ -340,6 +390,14 @@ public:
return size;
}
+ size_t GetChildrenDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const {
+ size_t size = 0;
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ size += Children[i]->GetSliceDataWeight(*data.child_data[i], offset, length);
+ }
+ return size;
+ }
+
size_t GetChildrenDefaultDataWeight() const {
size_t size = 0;
for (ui32 i = 0; i < Children.size(); ++i) {
@@ -393,6 +451,15 @@ public:
return size;
}
+ size_t GetChildrenDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const {
+ Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
+
+ size_t size = 0;
+ size += DateReader_.GetSliceDataWeight(*data.child_data[0], offset, length);
+ size += TimezoneReader_.GetSliceDataWeight(*data.child_data[1], offset, length);
+ return size;
+ }
+
size_t GetDataWeightImpl(const TBlockItem& item) const {
Y_UNUSED(item);
return GetChildrenDefaultDataWeight();
@@ -427,7 +494,7 @@ private:
// NOTE: For any singular type we use arrow::null() data type.
// This data type DOES NOT support bit mask so for optional type
// we have to use |TExternalOptional| wrapper.
-class TSingularTypeBlockReader: public IBlockReader {
+class TSingularTypeBlockReader: public TBlockReaderBase {
public:
TSingularTypeBlockReader() = default;
@@ -448,6 +515,11 @@ public:
return 0;
}
+ ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ Y_UNUSED(data, offset, length);
+ return 0;
+ }
+
ui64 GetDataWeight(TBlockItem item) const override {
Y_UNUSED(item);
return 0;
@@ -466,7 +538,7 @@ public:
}
};
-class TExternalOptionalBlockReader final : public IBlockReader {
+class TExternalOptionalBlockReader final : public TBlockReaderBase {
public:
TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
: Inner(std::move(inner))
@@ -490,7 +562,11 @@ public:
}
ui64 GetDataWeight(const arrow::ArrayData& data) const final {
- return data.length + Inner->GetDataWeight(*data.child_data.front());
+ return GetBitmaskDataWeight(data.length) + Inner->GetDataWeight(*data.child_data.front());
+ }
+
+ ui64 DoGetSliceDataWeight(const arrow::ArrayData& data, int64_t offset, int64_t length) const final {
+ return GetBitmaskDataWeight(length) + Inner->GetSliceDataWeight(*data.child_data.front(), offset, length);
}
ui64 GetDataWeight(TBlockItem item) const final {
diff --git a/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp b/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp
new file mode 100644
index 00000000000..1ac4c88d65e
--- /dev/null
+++ b/yql/essentials/public/udf/arrow/ut/block_reader_ut.cpp
@@ -0,0 +1,133 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+#include <yql/essentials/public/udf/arrow/block_reader.h>
+#include <yql/essentials/public/udf/arrow/memory_pool.h>
+
+namespace NYql::NUdf {
+
+namespace {
+
+using namespace NKikimr;
+
+class TBlockReaderFixture : public NUnitTest::TBaseFixture {
+ class TArrayHelpers : public TThrRefBase {
+ public:
+ using TPtr = TIntrusivePtr<TArrayHelpers>;
+
+ explicit TArrayHelpers(const NMiniKQL::TType* type, arrow::MemoryPool* const arrowPool)
+ : Builder(MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), type, *arrowPool, NMiniKQL::CalcBlockLen(CalcMaxBlockItemSize(type)), nullptr))
+ , Reader(MakeBlockReader(NMiniKQL::TTypeInfoHelper(), type))
+ {}
+
+ public:
+ const std::unique_ptr<IArrayBuilder> Builder;
+ const std::unique_ptr<IBlockReader> Reader;
+ };
+
+public:
+ TBlockReaderFixture()
+ : FunctionRegistry(CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()))
+ , Alloc(__LOCATION__)
+ , Env(Alloc)
+ , PgmBuilder(Env, *FunctionRegistry)
+ , ArrowPool(GetYqlMemoryPool())
+ {}
+
+ NMiniKQL::TType* OptionaType(NMiniKQL::TType* type) const {
+ return PgmBuilder.NewOptionalType(type);
+ }
+
+ template <typename T>
+ NMiniKQL::TType* DataType() const {
+ return PgmBuilder.NewDataType(NUdf::TDataType<T>::Id);
+ }
+
+ NMiniKQL::TType* DataType(NUdf::EDataSlot dataSlot) const {
+ return PgmBuilder.NewDataType(dataSlot);
+ }
+
+ template <typename... TArgs>
+ NMiniKQL::TType* TupleType(TArgs&&... args) const {
+ return PgmBuilder.NewTupleType({std::forward<TArgs>(args)...});
+ }
+
+ TArrayHelpers::TPtr GetArrayHelpers(const NMiniKQL::TType* type) const {
+ return MakeIntrusive<TArrayHelpers>(type, ArrowPool);
+ }
+
+public:
+ TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry;
+ NMiniKQL::TScopedAlloc Alloc;
+ NMiniKQL::TTypeEnvironment Env;
+ NMiniKQL::TProgramBuilder PgmBuilder;
+ arrow::MemoryPool* const ArrowPool;
+};
+
+} // anonymous namespace
+
+Y_UNIT_TEST_SUITE(BlockReaderTest) {
+ Y_UNIT_TEST_F(TestLogicalDataSize, TBlockReaderFixture) {
+ const std::vector arrayHelpers = {
+ GetArrayHelpers(DataType<ui32>()),
+ GetArrayHelpers(OptionaType(DataType<char*>())),
+ GetArrayHelpers(OptionaType(TupleType(OptionaType(DataType<ui32>()), DataType<char*>()))),
+ GetArrayHelpers(DataType(NUdf::EDataSlot::TzDate)),
+ GetArrayHelpers(PgmBuilder.NewNullType())
+ };
+
+ constexpr ui32 size = 1000;
+ constexpr ui32 stringSize = 37;
+ for (ui32 i = 0; i < size; ++i) {
+ arrayHelpers[0]->Builder->Add(TBlockItem(i));
+
+ const auto str = NUnitTest::RandomString(stringSize, i);
+ arrayHelpers[1]->Builder->Add((i % 2) ? TBlockItem(str) : TBlockItem());
+
+ TBlockItem tuple[] = { ((i / 2) % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(str) };
+ arrayHelpers[2]->Builder->Add((i % 2) ? TBlockItem(tuple) : TBlockItem());
+
+ TBlockItem tzDate(i);
+ tzDate.SetTimezoneId(i % 100);
+ arrayHelpers[3]->Builder->Add(tzDate);
+ arrayHelpers[4]->Builder->Add(TBlockItem::Zero());
+ }
+
+ std::vector<std::shared_ptr<arrow::ArrayData>> arrays;
+ arrays.reserve(arrayHelpers.size());
+ for (const auto& helper : arrayHelpers) {
+ arrays.emplace_back(helper->Builder->Build(true).array());
+ }
+
+ constexpr ui32 offset = 133;
+ constexpr ui32 len = 533;
+ static_assert(offset + len < size);
+
+ constexpr ui64 offsetSize = sizeof(arrow::BinaryType::offset_type) * len;
+ constexpr ui64 bitmaskSize = (len - 1) / 8 + 1;
+ constexpr ui64 nonEmptyStrings = (len - offset % 2) / 2 + offset % 2;
+ const std::vector<ui64> expectedLogicalSize = {
+ sizeof(ui32) * len,
+ bitmaskSize + offsetSize + stringSize * nonEmptyStrings,
+ 2 * bitmaskSize + offsetSize + sizeof(ui32) * len + stringSize * nonEmptyStrings,
+ (sizeof(ui16) + sizeof(ui16)) * len,
+ 0
+ };
+
+ // Test GetDataWeight with offset and length
+ for (ui32 i = 0; i < arrayHelpers.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL_C(arrayHelpers[i]->Reader->GetSliceDataWeight(*arrays[i], offset, len), expectedLogicalSize[i], "array: " << i);
+ }
+
+ // Test GetDataWeight after slize
+ for (ui32 i = 0; i < arrayHelpers.size(); ++i) {
+ const auto slice = DeepSlice(arrays[i], offset, len);
+ UNIT_ASSERT_VALUES_EQUAL_C(arrayHelpers[i]->Reader->GetDataWeight(*slice), expectedLogicalSize[i], "sliced array: " << i);
+ }
+ }
+}
+
+} // namespace NYql::NUdf
diff --git a/yql/essentials/public/udf/arrow/ut/ya.make b/yql/essentials/public/udf/arrow/ut/ya.make
index cf4e1a802a0..037deb20fc8 100644
--- a/yql/essentials/public/udf/arrow/ut/ya.make
+++ b/yql/essentials/public/udf/arrow/ut/ya.make
@@ -3,6 +3,7 @@ UNITTEST()
SRCS(
array_builder_ut.cpp
bit_util_ut.cpp
+ block_reader_ut.cpp
)
PEERDIR(