summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilezhankin <[email protected]>2025-06-16 11:20:01 +0300
committerilezhankin <[email protected]>2025-06-16 12:14:12 +0300
commit2779e9c86ed4699ecbc0ca2fde8d7bcb4cf7da5c (patch)
treecd2716c52737851540ca909d8c9bab35a31bde6f
parentc08395814b322c9dd280031bb20cdb1750467635 (diff)
Allow to modify the default buffer page size in TPagedBuffer
Воспользовавшись недавно добавленной метрикой про wasted-память стало понятно, что текущий универсальный размер страниц в `TPagedBuffer` слишком велик. В этом ПР добавляется возможность устанавливать размер страниц снаружи - отдельно для каждого объекта `TPagedBuffer`. commit_hash:f82fa4e6162a129bb4442f18657c32b074cb07f7
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.cpp12
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.h7
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp2
-rw-r--r--yql/essentials/minikql/mkql_buffer.cpp24
-rw-r--r--yql/essentials/minikql/mkql_buffer.h42
5 files changed, 53 insertions, 34 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
index ace74a62df9..95e2148e728 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
@@ -1067,8 +1067,9 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
// Transport packer
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
+ , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
@@ -1078,8 +1079,9 @@ TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* typ
}
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
+ , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
@@ -1146,7 +1148,7 @@ template<bool Fast>
TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls");
MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks");
- TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>();
+ TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(BufferPageAllocSize_);
if constexpr (Fast) {
PackImpl<Fast, false>(Type_, *result, value, State_);
} else {
@@ -1160,7 +1162,7 @@ TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& v
template<bool Fast>
void TValuePackerTransport<Fast>::StartPack() {
- Buffer_ = std::make_shared<TPagedBuffer>();
+ Buffer_ = std::make_shared<TPagedBuffer>(BufferPageAllocSize_);
if constexpr (Fast) {
// reserve place for list item count
Buffer_->ReserveHeader(sizeof(ItemCount_));
@@ -1442,7 +1444,7 @@ void TValuePackerTransport<Fast>::BuildMeta(TPagedBuffer::TPtr& buffer, bool add
} else {
s.OptionalMaskReserve = maskSize;
- TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>();
+ TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>(BufferPageAllocSize_);
SerializeMeta(*resultBuffer, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional));
if (addItemCount) {
PackData<Fast>(ItemCount_, *resultBuffer);
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
index 7325695a71d..12aac705bb9 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
@@ -75,9 +75,11 @@ class TValuePackerTransport {
public:
using TSelf = TValuePackerTransport<Fast>;
- explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+ explicit TValuePackerTransport(const TType* type,
+ TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
// for compatibility with TValuePackerGeneric - stable packing is not supported
- TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+ TValuePackerTransport(bool stable, const TType* type,
+ TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
// AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
@@ -115,6 +117,7 @@ private:
const TType* const Type_;
ui64 ItemCount_ = 0;
TPagedBuffer::TPtr Buffer_;
+ const size_t BufferPageAllocSize_;
mutable NDetails::TPackerState State_;
mutable NDetails::TPackerState IncrementalState_;
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
index cc175f3b7d8..19bca68b95f 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -823,7 +823,7 @@ protected:
columns.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum)));
}
- TValuePackerType packer(false, rowType, ArrowPool_, args.MinFillPercentage);
+ TValuePackerType packer(false, rowType, {}, ArrowPool_, args.MinFillPercentage);
if (legacyStruct) {
TUnboxedValueVector columnsCopy = columns;
NUdf::TUnboxedValue row = HolderFactory.VectorAsArray(columnsCopy);
diff --git a/yql/essentials/minikql/mkql_buffer.cpp b/yql/essentials/minikql/mkql_buffer.cpp
index b46a54e8f05..8948dbe1b7c 100644
--- a/yql/essentials/minikql/mkql_buffer.cpp
+++ b/yql/essentials/minikql/mkql_buffer.cpp
@@ -11,25 +11,29 @@ void InitializeGlobalPagedBufferCounters(::NMonitoring::TDynamicCounterPtr root)
}
#endif
-const size_t TBufferPage::PageCapacity = TBufferPage::PageAllocSize - sizeof(TBufferPage);
+static_assert(IsValidPageAllocSize(TBufferPage::DefaultPageAllocSize));
-TBufferPage* TBufferPage::Allocate() {
- static_assert(PageAllocSize <= std::numeric_limits<ui32>::max());
- static_assert(sizeof(TBufferPage) < PageAllocSize, "Page allocation size is too small");
- void* ptr = malloc(PageAllocSize);
+const size_t TBufferPage::DefaultPageCapacity = TBufferPage::DefaultPageAllocSize - sizeof(TBufferPage);
+
+TBufferPage* TBufferPage::Allocate(size_t pageAllocSize) {
+ void* ptr = malloc(pageAllocSize);
if (!ptr) {
throw std::bad_alloc();
}
TBufferPage* result = ::new (ptr) TBufferPage();
#if defined(PROFILE_MEMORY_ALLOCATIONS)
- TotalBytesWastedCounter->Add(result->Wasted());
+ // After allocation the whole page capacity is wasted (PageCapacity - PageSize)
+ TotalBytesWastedCounter->Add(pageAllocSize - sizeof(TBufferPage));
#endif
return result;
}
-void TBufferPage::Free(TBufferPage* page) {
+void TBufferPage::Free(TBufferPage* page, size_t pageAllocSize) {
#if defined(PROFILE_MEMORY_ALLOCATIONS)
- TotalBytesWastedCounter->Sub(page->Wasted());
+ // After free don't account the page's wasted (PageCapacity - PageSize)
+ TotalBytesWastedCounter->Sub(pageAllocSize - sizeof(TBufferPage) - page->Size());
+#else
+ Y_UNUSED(pageAllocSize);
#endif
free(page);
}
@@ -43,14 +47,14 @@ void TPagedBuffer::AppendPage() {
page = next;
page->Clear();
} else {
- page = TBufferPage::Allocate();
+ page = TBufferPage::Allocate(PageAllocSize);
tailPage->Next_ = page;
}
tailPage->Size_ = TailSize_;
ClosedPagesSize_ += TailSize_;
} else {
Y_DEBUG_ABORT_UNLESS(Head_ == nullptr);
- page = TBufferPage::Allocate();
+ page = TBufferPage::Allocate(PageAllocSize);
Head_ = page->Data();
}
TailSize_ = 0;
diff --git a/yql/essentials/minikql/mkql_buffer.h b/yql/essentials/minikql/mkql_buffer.h
index 97de3036c68..5eef3da65b8 100644
--- a/yql/essentials/minikql/mkql_buffer.h
+++ b/yql/essentials/minikql/mkql_buffer.h
@@ -23,10 +23,10 @@ class TPagedBuffer;
class TBufferPage : private TNonCopyable {
friend class TPagedBuffer;
- static const size_t PageCapacity;
+ static const size_t DefaultPageCapacity;
public:
- static const size_t PageAllocSize = 128 * 1024;
+ static const size_t DefaultPageAllocSize = 128 * 1024;
TBufferPage() = default;
~TBufferPage() = default;
@@ -55,16 +55,12 @@ public:
Size_ = 0;
}
- inline size_t Wasted() const {
- return PageCapacity - Size_;
- }
-
private:
TBufferPage* Next_ = nullptr;
size_t Size_ = 0;
- static TBufferPage* Allocate();
- static void Free(TBufferPage* page);
+ static TBufferPage* Allocate(size_t pageAllocSize = DefaultPageAllocSize);
+ static void Free(TBufferPage* page, size_t pageAllocSize = DefaultPageAllocSize);
static inline const TBufferPage* GetPage(const char* data) {
Y_DEBUG_ABORT_UNLESS(data);
@@ -77,6 +73,10 @@ private:
}
};
+static constexpr bool IsValidPageAllocSize(size_t size) {
+ return size <= std::numeric_limits<ui32>::max() && sizeof(TBufferPage) < size;
+}
+
class TPagedBuffer : private TNonCopyable {
public:
using TPtr = std::shared_ptr<TPagedBuffer>;
@@ -84,6 +84,13 @@ class TPagedBuffer : private TNonCopyable {
TPagedBuffer() = default;
+ explicit TPagedBuffer(size_t pageAllocSize)
+ : PageAllocSize(pageAllocSize)
+ , PageCapacity(pageAllocSize - sizeof(TBufferPage))
+ {
+ Y_ENSURE(IsValidPageAllocSize(pageAllocSize));
+ }
+
~TPagedBuffer() {
if (Head_) {
auto* tailPage = TBufferPage::GetPage(Tail_);
@@ -93,7 +100,7 @@ class TPagedBuffer : private TNonCopyable {
while (curr) {
auto drop = curr;
curr = curr->Next_;
- TBufferPage::Free(drop);
+ TBufferPage::Free(drop, PageAllocSize);
}
}
}
@@ -187,7 +194,7 @@ class TPagedBuffer : private TNonCopyable {
// TODO: not wasted or never called?
Tail_ = Head_;
ClosedPagesSize_ = HeadReserve_ = 0;
- TailSize_ = (-size_t(Tail_ == nullptr)) & TBufferPage::PageCapacity;
+ TailSize_ = (-size_t(Tail_ == nullptr)) & PageCapacity;
}
inline void EraseBack(size_t len) {
@@ -199,7 +206,7 @@ class TPagedBuffer : private TNonCopyable {
}
inline void Advance(size_t len) {
- if (Y_LIKELY(TailSize_ + len <= TBufferPage::PageCapacity)) {
+ if (Y_LIKELY(TailSize_ + len <= PageCapacity)) {
TailSize_ += len;
#if defined(PROFILE_MEMORY_ALLOCATIONS)
TotalBytesWastedCounter->Sub(len);
@@ -207,7 +214,7 @@ class TPagedBuffer : private TNonCopyable {
return;
}
- MKQL_ENSURE(len <= TBufferPage::PageCapacity, "Advance() size too big");
+ MKQL_ENSURE(len <= PageCapacity, "Advance() size too big");
AppendPage();
TailSize_ = len;
#if defined(PROFILE_MEMORY_ALLOCATIONS)
@@ -222,12 +229,12 @@ class TPagedBuffer : private TNonCopyable {
inline void Append(const char* data, size_t size) {
while (size) {
- if (TailSize_ == TBufferPage::PageCapacity) {
+ if (TailSize_ == PageCapacity) {
AppendPage();
}
- Y_DEBUG_ABORT_UNLESS(TailSize_ < TBufferPage::PageCapacity);
+ Y_DEBUG_ABORT_UNLESS(TailSize_ < PageCapacity);
- size_t avail = TBufferPage::PageCapacity - TailSize_;
+ size_t avail = PageCapacity - TailSize_;
size_t chunk = std::min(avail, size);
std::memcpy(Pos(), data, chunk);
TailSize_ += chunk;
@@ -243,11 +250,14 @@ class TPagedBuffer : private TNonCopyable {
private:
void AppendPage();
+ const size_t PageAllocSize = TBufferPage::DefaultPageAllocSize;
+ const size_t PageCapacity = TBufferPage::DefaultPageCapacity;
+
char* Head_ = nullptr;
char* Tail_ = nullptr;
// TailSize_ is initialized as if last page is full, this way we can simplifiy check in Advance()
- size_t TailSize_ = TBufferPage::PageCapacity;
+ size_t TailSize_ = PageCapacity;
size_t HeadReserve_ = 0;
size_t ClosedPagesSize_ = 0;
};