diff options
author | ilezhankin <[email protected]> | 2025-06-16 11:20:01 +0300 |
---|---|---|
committer | ilezhankin <[email protected]> | 2025-06-16 12:14:12 +0300 |
commit | 2779e9c86ed4699ecbc0ca2fde8d7bcb4cf7da5c (patch) | |
tree | cd2716c52737851540ca909d8c9bab35a31bde6f /yql/essentials/minikql/computation | |
parent | c08395814b322c9dd280031bb20cdb1750467635 (diff) |
Allow to modify the default buffer page size in TPagedBuffer
Воспользовавшись недавно добавленной метрикой про wasted-память стало понятно, что текущий универсальный размер страниц в `TPagedBuffer` слишком велик. В этом ПР добавляется возможность устанавливать размер страниц снаружи - отдельно для каждого объекта `TPagedBuffer`.
commit_hash:f82fa4e6162a129bb4442f18657c32b074cb07f7
Diffstat (limited to 'yql/essentials/minikql/computation')
3 files changed, 13 insertions, 8 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index ace74a62df9..95e2148e728 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -1067,8 +1067,9 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) + , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) @@ -1078,8 +1079,9 @@ TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* typ } template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) + , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) @@ -1146,7 +1148,7 @@ template<bool Fast> TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks"); - TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(); + TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); if constexpr (Fast) { PackImpl<Fast, false>(Type_, *result, value, State_); } else { @@ -1160,7 +1162,7 @@ TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& v template<bool Fast> void TValuePackerTransport<Fast>::StartPack() { - Buffer_ = std::make_shared<TPagedBuffer>(); + Buffer_ = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); if constexpr (Fast) { // reserve place for list item count Buffer_->ReserveHeader(sizeof(ItemCount_)); @@ -1442,7 +1444,7 @@ void TValuePackerTransport<Fast>::BuildMeta(TPagedBuffer::TPtr& buffer, bool add } else { s.OptionalMaskReserve = maskSize; - TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>(); + TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); SerializeMeta(*resultBuffer, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional)); if (addItemCount) { PackData<Fast>(ItemCount_, *resultBuffer); diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h index 7325695a71d..12aac705bb9 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h @@ -75,9 +75,11 @@ class TValuePackerTransport { public: using TSelf = TValuePackerTransport<Fast>; - explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + explicit TValuePackerTransport(const TType* type, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); // for compatibility with TValuePackerGeneric - stable packing is not supported - TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + TValuePackerTransport(bool stable, const TType* type, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); @@ -115,6 +117,7 @@ private: const TType* const Type_; ui64 ItemCount_ = 0; TPagedBuffer::TPtr Buffer_; + const size_t BufferPageAllocSize_; mutable NDetails::TPackerState State_; mutable NDetails::TPackerState IncrementalState_; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp index cc175f3b7d8..19bca68b95f 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -823,7 +823,7 @@ protected: columns.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum))); } - TValuePackerType packer(false, rowType, ArrowPool_, args.MinFillPercentage); + TValuePackerType packer(false, rowType, {}, ArrowPool_, args.MinFillPercentage); if (legacyStruct) { TUnboxedValueVector columnsCopy = columns; NUdf::TUnboxedValue row = HolderFactory.VectorAsArray(columnsCopy); |