summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2025-03-10 16:31:58 +0300
committergrigoriypisar <[email protected]>2025-03-10 17:02:21 +0300
commit0f4aaa0912c9c1842266278b8a5d85f55884d4bc (patch)
tree8d971b896b53b046e58594bc0fa3de1dc1914d32
parent40b5a1b2172e3284a705eeb05b3ee465257c815b (diff)
YQL mkql blocks trimmer, added size for sub buffers
Added sub buffers size into block trimer commit_hash:c1c708e6807c8e5b2ef7210dd13fe54216faa110
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer.cpp16
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp25
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp104
3 files changed, 122 insertions, 23 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
index 0b53f914525..0149ad4414b 100644
--- a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
@@ -37,6 +37,13 @@ protected:
return result;
}
+ template<typename TBuffer = NUdf::TResizeableBuffer>
+ std::unique_ptr<arrow::ResizableBuffer> CreateResizableBuffer(size_t size) const {
+ auto buffer = NUdf::AllocateResizableBuffer<TBuffer>(size, Pool_);
+ ARROW_OK(buffer->Resize(size, false));
+ return buffer;
+ }
+
protected:
arrow::MemoryPool* Pool_;
};
@@ -60,7 +67,7 @@ public:
auto origData = array->GetValues<TLayout>(1);
auto dataSize = sizeof(TLayout) * array->length;
- auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_);
+ auto trimmedDataBuffer = CreateResizableBuffer(dataSize);
memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize);
return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount());
@@ -86,8 +93,7 @@ public:
auto origData = array->GetValues<NUdf::TUnboxedValue>(1);
auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length;
- auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_);
- ARROW_OK(trimmedBuffer->Resize(dataSize));
+ auto trimmedBuffer = CreateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize);
auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data());
for (int64_t i = 0; i < array->length; i++) {
@@ -131,8 +137,8 @@ public:
auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]);
auto stringDataSize = origOffsetData[array->length] - origOffsetData[0];
- auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_);
- auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_);
+ auto trimmedOffsetBuffer = CreateResizableBuffer(sizeof(TOffset) * (array->length + 1));
+ auto trimmedStringBuffer = CreateResizableBuffer(stringDataSize);
auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data());
auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data());
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
index 6c2ecb6b16d..c84261fb10c 100644
--- a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp
@@ -13,6 +13,8 @@
using namespace NYql::NUdf;
using namespace NKikimr;
+namespace {
+
struct TBlockTrimmerTestData {
TBlockTrimmerTestData()
: FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry()))
@@ -32,6 +34,22 @@ struct TBlockTrimmerTestData {
arrow::MemoryPool* const ArrowPool;
};
+void CheckTrimmedSlice(std::shared_ptr<arrow::ArrayData> array) {
+ UNIT_ASSERT_VALUES_EQUAL(array->offset, 0);
+ for (const auto& buffer : array->buffers) {
+ if (buffer) {
+ UNIT_ASSERT_GE(buffer->size(), 1);
+ }
+ }
+ for (const auto& childData : array->child_data) {
+ if (childData) {
+ CheckTrimmedSlice(childData);
+ }
+ }
+}
+
+} // anonymous namespace
+
Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
Y_UNIT_TEST(TestFixedSize) {
TBlockTrimmerTestData data;
@@ -60,6 +78,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -105,6 +124,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -145,6 +165,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -191,6 +212,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -256,6 +278,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -305,6 +328,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
@@ -357,6 +381,7 @@ Y_UNIT_TEST_SUITE(TBlockTrimmerTest) {
for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) {
auto slice = Chop(array, sliceSize);
auto trimmedSlice = trimmer->Trim(slice);
+ CheckTrimmedSlice(trimmedSlice);
for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) {
TBlockItem lhs = reader->GetItem(*slice, elemIdx);
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
index cbff1c5722d..5bb0b980054 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -2,6 +2,8 @@
#include "mkql_computation_node_holders.h"
#include "mkql_block_builder.h"
#include "mkql_block_reader.h"
+#include "mkql_block_trimmer.h"
+
#include <yql/essentials/minikql/mkql_function_registry.h>
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
@@ -655,7 +657,53 @@ protected:
}
}
- void DoTestBlockPacking(ui64 offset, ui64 len, bool legacyStruct) {
+ struct TBlockTestArgs {
+ ui64 Offset = 0;
+ ui64 Len = 0;
+ bool LegacyStruct = false;
+ bool TrimBlock = false;
+
+ TString ToString() const {
+ return TStringBuilder() << "Offset: " << Offset << ", Len: " << Len << ", LegacyStruct: " << LegacyStruct << ", TrimBlock: " << TrimBlock;
+ }
+ };
+
+ class IArgsDispatcher : public TThrRefBase {
+ public:
+ using TPtr = TIntrusivePtr<IArgsDispatcher>;
+
+ virtual ui64 GetSize() const = 0;
+ virtual void Set(ui64 index) = 0;
+ };
+
+ template <typename TValue>
+ class TArgsDispatcher : public IArgsDispatcher {
+ public:
+ TArgsDispatcher(TValue& dst, const std::vector<TValue>& choices)
+ : Dst(dst)
+ , Choices(choices)
+ {
+ UNIT_ASSERT_C(!choices.empty(), "Choices should not be empty");
+ }
+
+ ui64 GetSize() const {
+ return Choices.size();
+ }
+
+ void Set(ui64 index) {
+ UNIT_ASSERT_LE_C(index + 1, Choices.size(), "Invalid args dispatcher index");
+ Dst = Choices[index];
+ }
+
+ private:
+ TValue& Dst;
+ const std::vector<TValue> Choices;
+ };
+
+ void DoTestBlockPacking(const TBlockTestArgs& args) {
+ bool legacyStruct = args.LegacyStruct;
+ ui64 offset = args.Offset;
+ ui64 len = args.Len;
if constexpr (Transport) {
auto strType = PgmBuilder.NewDataType(NUdf::TDataType<char*>::Id);
auto ui32Type = PgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id);
@@ -749,6 +797,19 @@ protected:
}
}
}
+ if (args.TrimBlock) {
+ for (ui32 index = 0; index < datums.size(); ++index) {
+ auto& datum = datums[index];
+ if (!datum.is_array()) {
+ continue;
+ }
+
+ const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index)
+ : static_cast<const TMultiType*>(rowType)->GetElementType(index);
+ const auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_);
+ datum = trimmer->Trim(datum.array());
+ }
+ }
TUnboxedValueVector columns;
for (auto& datum : datums) {
columns.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum)));
@@ -829,21 +890,34 @@ protected:
}
}
- void TestBlockPacking() {
- DoTestBlockPacking(0, 1000, false);
- }
-
- void TestBlockPackingSliced() {
- DoTestBlockPacking(19, 623, false);
+ void TestBlockPackingCases(TBlockTestArgs& args, std::vector<typename IArgsDispatcher::TPtr> dispatchers = {}) {
+ ui64 numberCases = 1;
+ for (const auto& dispatcher : dispatchers) {
+ numberCases *= dispatcher->GetSize();
+ }
+ for (ui64 i = 0; i < numberCases; ++i) {
+ ui64 caseId = i;
+ for (const auto& dispatcher : dispatchers) {
+ dispatcher->Set(caseId % dispatcher->GetSize());
+ caseId /= dispatcher->GetSize();
+ }
+ Cerr << "Run block packing test case: " << args.ToString() << "\n";
+ DoTestBlockPacking(args);
+ }
}
- void TestLegacyBlockPacking() {
- DoTestBlockPacking(0, 1000, true);
+ void TestBlockPacking() {
+ TBlockTestArgs args;
+ TestBlockPackingCases(args, {
+ MakeIntrusive<TArgsDispatcher<TBlockTestArgs>>(args, std::vector<TBlockTestArgs>{
+ {.Offset = 0, .Len = 1000},
+ {.Offset = 19, .Len = 623}
+ }),
+ MakeIntrusive<TArgsDispatcher<bool>>(args.LegacyStruct, std::vector<bool>{false, true}),
+ MakeIntrusive<TArgsDispatcher<bool>>(args.TrimBlock, std::vector<bool>{false, true})
+ });
}
- void TestLegacyBlockPackingSliced() {
- DoTestBlockPacking(19, 623, true);
- }
private:
TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry;
TIntrusivePtr<IRandomProvider> RandomProvider;
@@ -921,9 +995,6 @@ class TMiniKQLComputationNodeTransportPackTest: public TMiniKQLComputationNodePa
UNIT_TEST(TestRopeSplit);
UNIT_TEST(TestIncrementalPacking);
UNIT_TEST(TestBlockPacking);
- UNIT_TEST(TestBlockPackingSliced);
- UNIT_TEST(TestLegacyBlockPacking);
- UNIT_TEST(TestLegacyBlockPackingSliced);
UNIT_TEST_SUITE_END();
};
@@ -949,9 +1020,6 @@ class TMiniKQLComputationNodeTransportFastPackTest: public TMiniKQLComputationNo
UNIT_TEST(TestRopeSplit);
UNIT_TEST(TestIncrementalPacking);
UNIT_TEST(TestBlockPacking);
- UNIT_TEST(TestBlockPackingSliced);
- UNIT_TEST(TestLegacyBlockPacking);
- UNIT_TEST(TestLegacyBlockPackingSliced);
UNIT_TEST_SUITE_END();
};