diff options
| author | ilezhankin <[email protected]> | 2025-06-16 11:20:01 +0300 |
|---|---|---|
| committer | ilezhankin <[email protected]> | 2025-06-16 12:14:12 +0300 |
| commit | 2779e9c86ed4699ecbc0ca2fde8d7bcb4cf7da5c (patch) | |
| tree | cd2716c52737851540ca909d8c9bab35a31bde6f | |
| parent | c08395814b322c9dd280031bb20cdb1750467635 (diff) | |
Allow to modify the default buffer page size in TPagedBuffer
Воспользовавшись недавно добавленной метрикой про wasted-память стало понятно, что текущий универсальный размер страниц в `TPagedBuffer` слишком велик. В этом ПР добавляется возможность устанавливать размер страниц снаружи - отдельно для каждого объекта `TPagedBuffer`.
commit_hash:f82fa4e6162a129bb4442f18657c32b074cb07f7
5 files changed, 53 insertions, 34 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index ace74a62df9..95e2148e728 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -1067,8 +1067,9 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) + , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) @@ -1078,8 +1079,9 @@ TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* typ } template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) + , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) @@ -1146,7 +1148,7 @@ template<bool Fast> TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks"); - TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(); + TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); if constexpr (Fast) { PackImpl<Fast, false>(Type_, *result, value, State_); } else { @@ -1160,7 +1162,7 @@ TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& v template<bool Fast> void TValuePackerTransport<Fast>::StartPack() { - Buffer_ = std::make_shared<TPagedBuffer>(); + Buffer_ = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); if constexpr (Fast) { // reserve place for list item count Buffer_->ReserveHeader(sizeof(ItemCount_)); @@ -1442,7 +1444,7 @@ void TValuePackerTransport<Fast>::BuildMeta(TPagedBuffer::TPtr& buffer, bool add } else { s.OptionalMaskReserve = maskSize; - TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>(); + TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>(BufferPageAllocSize_); SerializeMeta(*resultBuffer, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional)); if (addItemCount) { PackData<Fast>(ItemCount_, *resultBuffer); diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h index 7325695a71d..12aac705bb9 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h @@ -75,9 +75,11 @@ class TValuePackerTransport { public: using TSelf = TValuePackerTransport<Fast>; - explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + explicit TValuePackerTransport(const TType* type, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); // for compatibility with TValuePackerGeneric - stable packing is not supported - TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + TValuePackerTransport(bool stable, const TType* type, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); @@ -115,6 +117,7 @@ private: const TType* const Type_; ui64 ItemCount_ = 0; TPagedBuffer::TPtr Buffer_; + const size_t BufferPageAllocSize_; mutable NDetails::TPackerState State_; mutable NDetails::TPackerState IncrementalState_; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp index cc175f3b7d8..19bca68b95f 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -823,7 +823,7 @@ protected: columns.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum))); } - TValuePackerType packer(false, rowType, ArrowPool_, args.MinFillPercentage); + TValuePackerType packer(false, rowType, {}, ArrowPool_, args.MinFillPercentage); if (legacyStruct) { TUnboxedValueVector columnsCopy = columns; NUdf::TUnboxedValue row = HolderFactory.VectorAsArray(columnsCopy); diff --git a/yql/essentials/minikql/mkql_buffer.cpp b/yql/essentials/minikql/mkql_buffer.cpp index b46a54e8f05..8948dbe1b7c 100644 --- a/yql/essentials/minikql/mkql_buffer.cpp +++ b/yql/essentials/minikql/mkql_buffer.cpp @@ -11,25 +11,29 @@ void InitializeGlobalPagedBufferCounters(::NMonitoring::TDynamicCounterPtr root) } #endif -const size_t TBufferPage::PageCapacity = TBufferPage::PageAllocSize - sizeof(TBufferPage); +static_assert(IsValidPageAllocSize(TBufferPage::DefaultPageAllocSize)); -TBufferPage* TBufferPage::Allocate() { - static_assert(PageAllocSize <= std::numeric_limits<ui32>::max()); - static_assert(sizeof(TBufferPage) < PageAllocSize, "Page allocation size is too small"); - void* ptr = malloc(PageAllocSize); +const size_t TBufferPage::DefaultPageCapacity = TBufferPage::DefaultPageAllocSize - sizeof(TBufferPage); + +TBufferPage* TBufferPage::Allocate(size_t pageAllocSize) { + void* ptr = malloc(pageAllocSize); if (!ptr) { throw std::bad_alloc(); } TBufferPage* result = ::new (ptr) TBufferPage(); #if defined(PROFILE_MEMORY_ALLOCATIONS) - TotalBytesWastedCounter->Add(result->Wasted()); + // After allocation the whole page capacity is wasted (PageCapacity - PageSize) + TotalBytesWastedCounter->Add(pageAllocSize - sizeof(TBufferPage)); #endif return result; } -void TBufferPage::Free(TBufferPage* page) { +void TBufferPage::Free(TBufferPage* page, size_t pageAllocSize) { #if defined(PROFILE_MEMORY_ALLOCATIONS) - TotalBytesWastedCounter->Sub(page->Wasted()); + // After free don't account the page's wasted (PageCapacity - PageSize) + TotalBytesWastedCounter->Sub(pageAllocSize - sizeof(TBufferPage) - page->Size()); +#else + Y_UNUSED(pageAllocSize); #endif free(page); } @@ -43,14 +47,14 @@ void TPagedBuffer::AppendPage() { page = next; page->Clear(); } else { - page = TBufferPage::Allocate(); + page = TBufferPage::Allocate(PageAllocSize); tailPage->Next_ = page; } tailPage->Size_ = TailSize_; ClosedPagesSize_ += TailSize_; } else { Y_DEBUG_ABORT_UNLESS(Head_ == nullptr); - page = TBufferPage::Allocate(); + page = TBufferPage::Allocate(PageAllocSize); Head_ = page->Data(); } TailSize_ = 0; diff --git a/yql/essentials/minikql/mkql_buffer.h b/yql/essentials/minikql/mkql_buffer.h index 97de3036c68..5eef3da65b8 100644 --- a/yql/essentials/minikql/mkql_buffer.h +++ b/yql/essentials/minikql/mkql_buffer.h @@ -23,10 +23,10 @@ class TPagedBuffer; class TBufferPage : private TNonCopyable { friend class TPagedBuffer; - static const size_t PageCapacity; + static const size_t DefaultPageCapacity; public: - static const size_t PageAllocSize = 128 * 1024; + static const size_t DefaultPageAllocSize = 128 * 1024; TBufferPage() = default; ~TBufferPage() = default; @@ -55,16 +55,12 @@ public: Size_ = 0; } - inline size_t Wasted() const { - return PageCapacity - Size_; - } - private: TBufferPage* Next_ = nullptr; size_t Size_ = 0; - static TBufferPage* Allocate(); - static void Free(TBufferPage* page); + static TBufferPage* Allocate(size_t pageAllocSize = DefaultPageAllocSize); + static void Free(TBufferPage* page, size_t pageAllocSize = DefaultPageAllocSize); static inline const TBufferPage* GetPage(const char* data) { Y_DEBUG_ABORT_UNLESS(data); @@ -77,6 +73,10 @@ private: } }; +static constexpr bool IsValidPageAllocSize(size_t size) { + return size <= std::numeric_limits<ui32>::max() && sizeof(TBufferPage) < size; +} + class TPagedBuffer : private TNonCopyable { public: using TPtr = std::shared_ptr<TPagedBuffer>; @@ -84,6 +84,13 @@ class TPagedBuffer : private TNonCopyable { TPagedBuffer() = default; + explicit TPagedBuffer(size_t pageAllocSize) + : PageAllocSize(pageAllocSize) + , PageCapacity(pageAllocSize - sizeof(TBufferPage)) + { + Y_ENSURE(IsValidPageAllocSize(pageAllocSize)); + } + ~TPagedBuffer() { if (Head_) { auto* tailPage = TBufferPage::GetPage(Tail_); @@ -93,7 +100,7 @@ class TPagedBuffer : private TNonCopyable { while (curr) { auto drop = curr; curr = curr->Next_; - TBufferPage::Free(drop); + TBufferPage::Free(drop, PageAllocSize); } } } @@ -187,7 +194,7 @@ class TPagedBuffer : private TNonCopyable { // TODO: not wasted or never called? Tail_ = Head_; ClosedPagesSize_ = HeadReserve_ = 0; - TailSize_ = (-size_t(Tail_ == nullptr)) & TBufferPage::PageCapacity; + TailSize_ = (-size_t(Tail_ == nullptr)) & PageCapacity; } inline void EraseBack(size_t len) { @@ -199,7 +206,7 @@ class TPagedBuffer : private TNonCopyable { } inline void Advance(size_t len) { - if (Y_LIKELY(TailSize_ + len <= TBufferPage::PageCapacity)) { + if (Y_LIKELY(TailSize_ + len <= PageCapacity)) { TailSize_ += len; #if defined(PROFILE_MEMORY_ALLOCATIONS) TotalBytesWastedCounter->Sub(len); @@ -207,7 +214,7 @@ class TPagedBuffer : private TNonCopyable { return; } - MKQL_ENSURE(len <= TBufferPage::PageCapacity, "Advance() size too big"); + MKQL_ENSURE(len <= PageCapacity, "Advance() size too big"); AppendPage(); TailSize_ = len; #if defined(PROFILE_MEMORY_ALLOCATIONS) @@ -222,12 +229,12 @@ class TPagedBuffer : private TNonCopyable { inline void Append(const char* data, size_t size) { while (size) { - if (TailSize_ == TBufferPage::PageCapacity) { + if (TailSize_ == PageCapacity) { AppendPage(); } - Y_DEBUG_ABORT_UNLESS(TailSize_ < TBufferPage::PageCapacity); + Y_DEBUG_ABORT_UNLESS(TailSize_ < PageCapacity); - size_t avail = TBufferPage::PageCapacity - TailSize_; + size_t avail = PageCapacity - TailSize_; size_t chunk = std::min(avail, size); std::memcpy(Pos(), data, chunk); TailSize_ += chunk; @@ -243,11 +250,14 @@ class TPagedBuffer : private TNonCopyable { private: void AppendPage(); + const size_t PageAllocSize = TBufferPage::DefaultPageAllocSize; + const size_t PageCapacity = TBufferPage::DefaultPageCapacity; + char* Head_ = nullptr; char* Tail_ = nullptr; // TailSize_ is initialized as if last page is full, this way we can simplifiy check in Advance() - size_t TailSize_ = TBufferPage::PageCapacity; + size_t TailSize_ = PageCapacity; size_t HeadReserve_ = 0; size_t ClosedPagesSize_ = 0; }; |
