diff options
author | grigoriypisar <[email protected]> | 2025-03-10 16:31:58 +0300 |
---|---|---|
committer | grigoriypisar <[email protected]> | 2025-03-10 17:02:21 +0300 |
commit | 0f4aaa0912c9c1842266278b8a5d85f55884d4bc (patch) | |
tree | 8d971b896b53b046e58594bc0fa3de1dc1914d32 | |
parent | 40b5a1b2172e3284a705eeb05b3ee465257c815b (diff) |
YQL mkql blocks trimmer, added size for sub buffers
Added sub buffers size into block trimer
commit_hash:c1c708e6807c8e5b2ef7210dd13fe54216faa110
3 files changed, 122 insertions, 23 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp index 0b53f914525..0149ad4414b 100644 --- a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp @@ -37,6 +37,13 @@ protected: return result; } + template<typename TBuffer = NUdf::TResizeableBuffer> + std::unique_ptr<arrow::ResizableBuffer> CreateResizableBuffer(size_t size) const { + auto buffer = NUdf::AllocateResizableBuffer<TBuffer>(size, Pool_); + ARROW_OK(buffer->Resize(size, false)); + return buffer; + } + protected: arrow::MemoryPool* Pool_; }; @@ -60,7 +67,7 @@ public: auto origData = array->GetValues<TLayout>(1); auto dataSize = sizeof(TLayout) * array->length; - auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_); + auto trimmedDataBuffer = CreateResizableBuffer(dataSize); memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize); return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount()); @@ -86,8 +93,7 @@ public: auto origData = array->GetValues<NUdf::TUnboxedValue>(1); auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length; - auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_); - ARROW_OK(trimmedBuffer->Resize(dataSize)); + auto trimmedBuffer = CreateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize); auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data()); for (int64_t i = 0; i < array->length; i++) { @@ -131,8 +137,8 @@ public: auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]); auto stringDataSize = origOffsetData[array->length] - origOffsetData[0]; - auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_); - auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_); + auto trimmedOffsetBuffer = CreateResizableBuffer(sizeof(TOffset) * (array->length + 1)); + auto trimmedStringBuffer = CreateResizableBuffer(stringDataSize); auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data()); auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data()); diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp index 6c2ecb6b16d..c84261fb10c 100644 --- a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp @@ -13,6 +13,8 @@ using namespace NYql::NUdf; using namespace NKikimr; +namespace { + struct TBlockTrimmerTestData { TBlockTrimmerTestData() : FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry())) @@ -32,6 +34,22 @@ struct TBlockTrimmerTestData { arrow::MemoryPool* const ArrowPool; }; +void CheckTrimmedSlice(std::shared_ptr<arrow::ArrayData> array) { + UNIT_ASSERT_VALUES_EQUAL(array->offset, 0); + for (const auto& buffer : array->buffers) { + if (buffer) { + UNIT_ASSERT_GE(buffer->size(), 1); + } + } + for (const auto& childData : array->child_data) { + if (childData) { + CheckTrimmedSlice(childData); + } + } +} + +} // anonymous namespace + Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { Y_UNIT_TEST(TestFixedSize) { TBlockTrimmerTestData data; @@ -60,6 +78,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -105,6 +124,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -145,6 +165,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -191,6 +212,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -256,6 +278,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -305,6 +328,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); @@ -357,6 +381,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { auto slice = Chop(array, sliceSize); auto trimmedSlice = trimmer->Trim(slice); + CheckTrimmedSlice(trimmedSlice); for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { TBlockItem lhs = reader->GetItem(*slice, elemIdx); diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp index cbff1c5722d..5bb0b980054 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -2,6 +2,8 @@ #include "mkql_computation_node_holders.h" #include "mkql_block_builder.h" #include "mkql_block_reader.h" +#include "mkql_block_trimmer.h" + #include <yql/essentials/minikql/mkql_function_registry.h> #include <yql/essentials/minikql/mkql_node.h> #include <yql/essentials/minikql/mkql_program_builder.h> @@ -655,7 +657,53 @@ protected: } } - void DoTestBlockPacking(ui64 offset, ui64 len, bool legacyStruct) { + struct TBlockTestArgs { + ui64 Offset = 0; + ui64 Len = 0; + bool LegacyStruct = false; + bool TrimBlock = false; + + TString ToString() const { + return TStringBuilder() << "Offset: " << Offset << ", Len: " << Len << ", LegacyStruct: " << LegacyStruct << ", TrimBlock: " << TrimBlock; + } + }; + + class IArgsDispatcher : public TThrRefBase { + public: + using TPtr = TIntrusivePtr<IArgsDispatcher>; + + virtual ui64 GetSize() const = 0; + virtual void Set(ui64 index) = 0; + }; + + template <typename TValue> + class TArgsDispatcher : public IArgsDispatcher { + public: + TArgsDispatcher(TValue& dst, const std::vector<TValue>& choices) + : Dst(dst) + , Choices(choices) + { + UNIT_ASSERT_C(!choices.empty(), "Choices should not be empty"); + } + + ui64 GetSize() const { + return Choices.size(); + } + + void Set(ui64 index) { + UNIT_ASSERT_LE_C(index + 1, Choices.size(), "Invalid args dispatcher index"); + Dst = Choices[index]; + } + + private: + TValue& Dst; + const std::vector<TValue> Choices; + }; + + void DoTestBlockPacking(const TBlockTestArgs& args) { + bool legacyStruct = args.LegacyStruct; + ui64 offset = args.Offset; + ui64 len = args.Len; if constexpr (Transport) { auto strType = PgmBuilder.NewDataType(NUdf::TDataType<char*>::Id); auto ui32Type = PgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); @@ -749,6 +797,19 @@ protected: } } } + if (args.TrimBlock) { + for (ui32 index = 0; index < datums.size(); ++index) { + auto& datum = datums[index]; + if (!datum.is_array()) { + continue; + } + + const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index) + : static_cast<const TMultiType*>(rowType)->GetElementType(index); + const auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_); + datum = trimmer->Trim(datum.array()); + } + } TUnboxedValueVector columns; for (auto& datum : datums) { columns.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum))); @@ -829,21 +890,34 @@ protected: } } - void TestBlockPacking() { - DoTestBlockPacking(0, 1000, false); - } - - void TestBlockPackingSliced() { - DoTestBlockPacking(19, 623, false); + void TestBlockPackingCases(TBlockTestArgs& args, std::vector<typename IArgsDispatcher::TPtr> dispatchers = {}) { + ui64 numberCases = 1; + for (const auto& dispatcher : dispatchers) { + numberCases *= dispatcher->GetSize(); + } + for (ui64 i = 0; i < numberCases; ++i) { + ui64 caseId = i; + for (const auto& dispatcher : dispatchers) { + dispatcher->Set(caseId % dispatcher->GetSize()); + caseId /= dispatcher->GetSize(); + } + Cerr << "Run block packing test case: " << args.ToString() << "\n"; + DoTestBlockPacking(args); + } } - void TestLegacyBlockPacking() { - DoTestBlockPacking(0, 1000, true); + void TestBlockPacking() { + TBlockTestArgs args; + TestBlockPackingCases(args, { + MakeIntrusive<TArgsDispatcher<TBlockTestArgs>>(args, std::vector<TBlockTestArgs>{ + {.Offset = 0, .Len = 1000}, + {.Offset = 19, .Len = 623} + }), + MakeIntrusive<TArgsDispatcher<bool>>(args.LegacyStruct, std::vector<bool>{false, true}), + MakeIntrusive<TArgsDispatcher<bool>>(args.TrimBlock, std::vector<bool>{false, true}) + }); } - void TestLegacyBlockPackingSliced() { - DoTestBlockPacking(19, 623, true); - } private: TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry; TIntrusivePtr<IRandomProvider> RandomProvider; @@ -921,9 +995,6 @@ class TMiniKQLComputationNodeTransportPackTest: public TMiniKQLComputationNodePa UNIT_TEST(TestRopeSplit); UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); - UNIT_TEST(TestBlockPackingSliced); - UNIT_TEST(TestLegacyBlockPacking); - UNIT_TEST(TestLegacyBlockPackingSliced); UNIT_TEST_SUITE_END(); }; @@ -949,9 +1020,6 @@ class TMiniKQLComputationNodeTransportFastPackTest: public TMiniKQLComputationNo UNIT_TEST(TestRopeSplit); UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); - UNIT_TEST(TestBlockPackingSliced); - UNIT_TEST(TestLegacyBlockPacking); - UNIT_TEST(TestLegacyBlockPackingSliced); UNIT_TEST_SUITE_END(); }; |