aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2023-01-03 13:23:49 +0300
committerbabenko <babenko@yandex-team.com>2023-01-03 13:23:49 +0300
commit85dbae30a801094e01a9aa5e4ecb1be070420ed4 (patch)
tree1381aac9994baae96382a5231e8b1596d876320f /library
parent8ee4eaa91898ce3adcdf302ba33311fc9e627282 (diff)
downloadydb-85dbae30a801094e01a9aa5e4ecb1be070420ed4.tar.gz
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library wip
Diffstat (limited to 'library')
-rw-r--r--library/cpp/yt/memory/CMakeLists.darwin.txt4
-rw-r--r--library/cpp/yt/memory/CMakeLists.linux-aarch64.txt4
-rw-r--r--library/cpp/yt/memory/CMakeLists.linux.txt4
-rw-r--r--library/cpp/yt/memory/chunked_input_stream.cpp34
-rw-r--r--library/cpp/yt/memory/chunked_input_stream.h29
-rw-r--r--library/cpp/yt/memory/chunked_memory_allocator-inl.h44
-rw-r--r--library/cpp/yt/memory/chunked_memory_allocator.cpp57
-rw-r--r--library/cpp/yt/memory/chunked_memory_allocator.h71
-rw-r--r--library/cpp/yt/memory/chunked_memory_pool-inl.h142
-rw-r--r--library/cpp/yt/memory/chunked_memory_pool.cpp200
-rw-r--r--library/cpp/yt/memory/chunked_memory_pool.h159
-rw-r--r--library/cpp/yt/memory/chunked_memory_pool_output.cpp65
-rw-r--r--library/cpp/yt/memory/chunked_memory_pool_output.h43
-rw-r--r--library/cpp/yt/memory/public.h15
-rw-r--r--library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp39
-rw-r--r--library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp74
16 files changed, 984 insertions, 0 deletions
diff --git a/library/cpp/yt/memory/CMakeLists.darwin.txt b/library/cpp/yt/memory/CMakeLists.darwin.txt
index 5bcfddec02..e3c3cae0f0 100644
--- a/library/cpp/yt/memory/CMakeLists.darwin.txt
+++ b/library/cpp/yt/memory/CMakeLists.darwin.txt
@@ -17,6 +17,10 @@ target_link_libraries(cpp-yt-memory PUBLIC
)
target_sources(cpp-yt-memory PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp
diff --git a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt
index 0e7c58d94f..2950017992 100644
--- a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC
)
target_sources(cpp-yt-memory PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp
diff --git a/library/cpp/yt/memory/CMakeLists.linux.txt b/library/cpp/yt/memory/CMakeLists.linux.txt
index 0e7c58d94f..2950017992 100644
--- a/library/cpp/yt/memory/CMakeLists.linux.txt
+++ b/library/cpp/yt/memory/CMakeLists.linux.txt
@@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC
)
target_sources(cpp-yt-memory PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp
diff --git a/library/cpp/yt/memory/chunked_input_stream.cpp b/library/cpp/yt/memory/chunked_input_stream.cpp
new file mode 100644
index 0000000000..ea579a46ab
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_input_stream.cpp
@@ -0,0 +1,34 @@
+#include "chunked_input_stream.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TChunkedInputStream::TChunkedInputStream(std::vector<TSharedRef> blocks)
+ : Blocks_(std::move(blocks))
+{ }
+
+size_t TChunkedInputStream::DoNext(const void** ptr, size_t len)
+{
+ SkipCompletedBlocks();
+ if (Index_ == Blocks_.size()) {
+ *ptr = nullptr;
+ return 0;
+ }
+ *ptr = Blocks_[Index_].Begin() + Position_;
+ size_t toSkip = std::min(Blocks_[Index_].Size() - Position_, len);
+ Position_ += toSkip;
+ return toSkip;
+}
+
+void TChunkedInputStream::SkipCompletedBlocks()
+{
+ while (Index_ < Blocks_.size() && Position_ == Blocks_[Index_].Size()) {
+ Index_ += 1;
+ Position_ = 0;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_input_stream.h b/library/cpp/yt/memory/chunked_input_stream.h
new file mode 100644
index 0000000000..baef2a4c66
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_input_stream.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "ref.h"
+
+#include <util/stream/zerocopy.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TChunkedInputStream
+ : public IZeroCopyInput
+{
+public:
+ explicit TChunkedInputStream(std::vector<TSharedRef> blocks);
+
+ size_t DoNext(const void** ptr, size_t len) override;
+
+private:
+ const std::vector<TSharedRef> Blocks_;
+ size_t Index_ = 0;
+ size_t Position_ = 0;
+
+ void SkipCompletedBlocks();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_memory_allocator-inl.h b/library/cpp/yt/memory/chunked_memory_allocator-inl.h
new file mode 100644
index 0000000000..fe66060b2d
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_allocator-inl.h
@@ -0,0 +1,44 @@
+#ifndef CHUNKED_MEMORY_ALLOCATOR_INL_H_
+#error "Direct inclusion of this file is not allowed, include chunked_memory_allocator.h"
+// For the sake of sane code completion.
+#include "chunked_memory_allocator.h"
+#endif
+
+#include "serialize.h"
+
+#include <util/system/align.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+inline TSharedMutableRef TChunkedMemoryAllocator::AllocateUnaligned(i64 size)
+{
+ // Fast path.
+ if (FreeZoneEnd_ >= FreeZoneBegin_ + size) {
+ FreeZoneEnd_ -= size;
+ return Chunk_.Slice(FreeZoneEnd_, FreeZoneEnd_ + size);
+ }
+
+ // Slow path.
+ return AllocateUnalignedSlow(size);
+}
+
+inline TSharedMutableRef TChunkedMemoryAllocator::AllocateAligned(i64 size, int align)
+{
+ // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full.
+ FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align);
+
+ // Fast path.
+ if (FreeZoneBegin_ + size <= FreeZoneEnd_) {
+ FreeZoneBegin_ += size;
+ return Chunk_.Slice(FreeZoneBegin_ - size, FreeZoneBegin_);
+ }
+
+ // Slow path.
+ return AllocateAlignedSlow(size, align);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_memory_allocator.cpp b/library/cpp/yt/memory/chunked_memory_allocator.cpp
new file mode 100644
index 0000000000..3560f8096d
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_allocator.cpp
@@ -0,0 +1,57 @@
+#include "chunked_memory_allocator.h"
+#include "serialize.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+const i64 TChunkedMemoryAllocator::DefaultChunkSize = 4096;
+const double TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio = 0.25;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TChunkedMemoryAllocator::TChunkedMemoryAllocator(
+ i64 chunkSize,
+ double maxSmallBlockSizeRatio,
+ TRefCountedTypeCookie tagCookie)
+ : ChunkSize_(chunkSize)
+ , MaxSmallBlockSize_(static_cast<i64>(ChunkSize_ * maxSmallBlockSizeRatio))
+ , TagCookie_(tagCookie)
+{ }
+
+TSharedMutableRef TChunkedMemoryAllocator::AllocateUnalignedSlow(i64 size)
+{
+ auto large = AllocateSlowCore(size);
+ if (large) {
+ return large;
+ }
+ return AllocateUnaligned(size);
+}
+
+TSharedMutableRef TChunkedMemoryAllocator::AllocateAlignedSlow(i64 size, int align)
+{
+ // NB: Do not rely on any particular alignment of chunks.
+ auto large = AllocateSlowCore(size + align);
+ if (large) {
+ auto* alignedBegin = AlignUp(large.Begin(), align);
+ return large.Slice(alignedBegin, alignedBegin + size);
+ }
+ return AllocateAligned(size, align);
+}
+
+TSharedMutableRef TChunkedMemoryAllocator::AllocateSlowCore(i64 size)
+{
+ if (size > MaxSmallBlockSize_) {
+ return TSharedMutableRef::Allocate(size, {.InitializeStorage = false}, TagCookie_);
+ }
+
+ Chunk_ = TSharedMutableRef::Allocate(ChunkSize_, {.InitializeStorage = false}, TagCookie_);
+ FreeZoneBegin_ = Chunk_.Begin();
+ FreeZoneEnd_ = Chunk_.End();
+
+ return TSharedMutableRef();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_memory_allocator.h b/library/cpp/yt/memory/chunked_memory_allocator.h
new file mode 100644
index 0000000000..33559d8aa7
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_allocator.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include "ref.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TDefaultChunkedMemoryAllocatorTag
+{ };
+
+//! Mimics TChunkedMemoryPool but acts as an allocator returning shared refs.
+class TChunkedMemoryAllocator
+ : private TNonCopyable
+{
+public:
+ static const i64 DefaultChunkSize;
+ static const double DefaultMaxSmallBlockSizeRatio;
+
+ explicit TChunkedMemoryAllocator(
+ i64 chunkSize = DefaultChunkSize,
+ double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio,
+ TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedMemoryAllocatorTag>());
+
+ template <class TTag>
+ explicit TChunkedMemoryAllocator(
+ TTag /*tag*/ = TTag(),
+ i64 chunkSize = DefaultChunkSize,
+ double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio)
+ : TChunkedMemoryAllocator(
+ chunkSize,
+ maxSmallBlockSizeRatio,
+ GetRefCountedTypeCookie<TTag>())
+ { }
+
+ //! Allocates #sizes bytes without any alignment.
+ TSharedMutableRef AllocateUnaligned(i64 size);
+
+ //! Allocates #size bytes aligned with 8-byte granularity.
+ TSharedMutableRef AllocateAligned(i64 size, int align = 8);
+
+private:
+ const i64 ChunkSize_;
+ const i64 MaxSmallBlockSize_;
+ const TRefCountedTypeCookie TagCookie_;
+
+ char EmptyBuf_[0];
+
+ // Chunk memory layout:
+ // |AAAA|....|UUUU|
+ // Legend:
+ // A aligned allocations
+ // U unaligned allocations
+ // . free zone
+ char* FreeZoneBegin_ = EmptyBuf_;
+ char* FreeZoneEnd_ = EmptyBuf_;
+ TSharedMutableRef Chunk_;
+
+ TSharedMutableRef AllocateUnalignedSlow(i64 size);
+ TSharedMutableRef AllocateAlignedSlow(i64 size, int align);
+ TSharedMutableRef AllocateSlowCore(i64 size);
+
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define CHUNKED_MEMORY_ALLOCATOR_INL_H_
+#include "chunked_memory_allocator-inl.h"
+#undef CHUNKED_MEMORY_ALLOCATOR_INL_H_
diff --git a/library/cpp/yt/memory/chunked_memory_pool-inl.h b/library/cpp/yt/memory/chunked_memory_pool-inl.h
new file mode 100644
index 0000000000..267c3e9221
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_pool-inl.h
@@ -0,0 +1,142 @@
+#ifndef CHUNKED_MEMORY_POOL_INL_H_
+#error "Direct inclusion of this file is not allowed, include chunked_memory_pool.h"
+// For the sake of sane code completion.
+#include "chunked_memory_pool.h"
+#endif
+
+#include "serialize.h"
+
+#include <library/cpp/yt/malloc/malloc.h>
+
+#include <util/system/align.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+inline void TAllocationHolder::operator delete(void* ptr) noexcept
+{
+ ::free(ptr);
+}
+
+inline TMutableRef TAllocationHolder::GetRef() const
+{
+ return Ref_;
+}
+
+template <class TDerived>
+TDerived* TAllocationHolder::Allocate(size_t size, TRefCountedTypeCookie cookie)
+{
+ auto requestedSize = sizeof(TDerived) + size;
+ auto* ptr = ::malloc(requestedSize);
+ auto allocatedSize = ::malloc_usable_size(ptr);
+ if (allocatedSize) {
+ size += allocatedSize - requestedSize;
+ }
+
+ auto* instance = static_cast<TDerived*>(ptr);
+
+ try {
+ new (instance) TDerived(TMutableRef(instance + 1, size), cookie);
+ } catch (const std::exception& ex) {
+ // Do not forget to free the memory.
+ ::free(ptr);
+ throw;
+ }
+
+ return instance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+inline TChunkedMemoryPool::TChunkedMemoryPool()
+ : TChunkedMemoryPool(
+ GetRefCountedTypeCookie<TDefaultChunkedMemoryPoolTag>())
+{ }
+
+template <class TTag>
+inline TChunkedMemoryPool::TChunkedMemoryPool(
+ TTag,
+ size_t startChunkSize)
+ : TChunkedMemoryPool(
+ GetRefCountedTypeCookie<TTag>(),
+ startChunkSize)
+{ }
+
+inline char* TChunkedMemoryPool::AllocateUnaligned(size_t size)
+{
+ // Fast path.
+ if (FreeZoneEnd_ >= FreeZoneBegin_ + size) {
+ FreeZoneEnd_ -= size;
+ Size_ += size;
+ return FreeZoneEnd_;
+ }
+
+ // Slow path.
+ return AllocateUnalignedSlow(size);
+}
+
+inline char* TChunkedMemoryPool::AllocateAligned(size_t size, int align)
+{
+ // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full.
+ FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align);
+
+ // Fast path.
+ if (FreeZoneBegin_ + size <= FreeZoneEnd_) {
+ char* result = FreeZoneBegin_;
+ Size_ += size;
+ FreeZoneBegin_ += size;
+ return result;
+ }
+
+ // Slow path.
+ return AllocateAlignedSlow(size, align);
+}
+
+template <class T>
+inline T* TChunkedMemoryPool::AllocateUninitialized(int n, int align)
+{
+ return reinterpret_cast<T*>(AllocateAligned(sizeof(T) * n, align));
+}
+
+template <class T>
+inline TMutableRange<T> TChunkedMemoryPool::Capture(TRange<T> src, int align)
+{
+ auto* dst = AllocateUninitialized<T>(src.Size(), align);
+ ::memcpy(dst, src.Begin(), sizeof(T) * src.Size());
+ return TMutableRange<T>(dst, src.Size());
+}
+
+inline void TChunkedMemoryPool::Free(char* from, char* to)
+{
+ if (FreeZoneBegin_ == to) {
+ FreeZoneBegin_ = from;
+ }
+ if (FreeZoneEnd_ == from) {
+ FreeZoneEnd_ = to;
+ }
+}
+
+inline void TChunkedMemoryPool::Clear()
+{
+ Size_ = 0;
+
+ if (Chunks_.empty()) {
+ FreeZoneBegin_ = nullptr;
+ FreeZoneEnd_ = nullptr;
+ NextChunkIndex_ = 0;
+ } else {
+ FreeZoneBegin_ = Chunks_.front()->GetRef().Begin();
+ FreeZoneEnd_ = Chunks_.front()->GetRef().End();
+ NextChunkIndex_ = 1;
+ }
+
+ for (const auto& block : OtherBlocks_) {
+ Capacity_ -= block->GetRef().Size();
+ }
+ OtherBlocks_.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_memory_pool.cpp b/library/cpp/yt/memory/chunked_memory_pool.cpp
new file mode 100644
index 0000000000..a10a6fe724
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_pool.cpp
@@ -0,0 +1,200 @@
+#include "chunked_memory_pool.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAllocationHolder::TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie)
+ : Ref_(ref)
+#ifdef YT_ENABLE_REF_COUNTED_TRACKING
+ , Cookie_(cookie)
+#endif
+{
+#ifdef YT_ENABLE_REF_COUNTED_TRACKING
+ if (Cookie_ != NullRefCountedTypeCookie) {
+ TRefCountedTrackerFacade::AllocateTagInstance(Cookie_);
+ TRefCountedTrackerFacade::AllocateSpace(Cookie_, Ref_.Size());
+ }
+#endif
+}
+
+TAllocationHolder::~TAllocationHolder()
+{
+#ifdef YT_ENABLE_REF_COUNTED_TRACKING
+ if (Cookie_ != NullRefCountedTypeCookie) {
+ TRefCountedTrackerFacade::FreeTagInstance(Cookie_);
+ TRefCountedTrackerFacade::FreeSpace(Cookie_, Ref_.Size());
+ }
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDefaultMemoryChunkProvider
+ : public IMemoryChunkProvider
+{
+public:
+ std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) override
+ {
+ return std::unique_ptr<TAllocationHolder>(TAllocationHolder::Allocate<TAllocationHolder>(size, cookie));
+ }
+};
+
+const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider()
+{
+ static const IMemoryChunkProviderPtr Result = New<TDefaultMemoryChunkProvider>();
+ return Result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TChunkedMemoryPool::TChunkedMemoryPool(
+ TRefCountedTypeCookie tagCookie,
+ IMemoryChunkProviderPtr chunkProvider,
+ size_t startChunkSize)
+ : TagCookie_(tagCookie)
+ , ChunkProviderHolder_(std::move(chunkProvider))
+ , ChunkProvider_(ChunkProviderHolder_.Get())
+{
+ Initialize(startChunkSize);
+}
+
+TChunkedMemoryPool::TChunkedMemoryPool(
+ TRefCountedTypeCookie tagCookie,
+ size_t startChunkSize)
+ : TagCookie_(tagCookie)
+ , ChunkProvider_(GetDefaultMemoryChunkProvider().Get())
+{
+ Initialize(startChunkSize);
+}
+
+void TChunkedMemoryPool::Initialize(size_t startChunkSize)
+{
+ NextSmallSize_ = startChunkSize;
+ FreeZoneBegin_ = nullptr;
+ FreeZoneEnd_ = nullptr;
+}
+
+void TChunkedMemoryPool::Purge()
+{
+ Chunks_.clear();
+ OtherBlocks_.clear();
+ Size_ = 0;
+ Capacity_ = 0;
+ NextChunkIndex_ = 0;
+ FreeZoneBegin_ = nullptr;
+ FreeZoneEnd_ = nullptr;
+}
+
+char* TChunkedMemoryPool::AllocateUnalignedSlow(size_t size)
+{
+ auto* large = AllocateSlowCore(size);
+ if (large) {
+ return large;
+ }
+ return AllocateUnaligned(size);
+}
+
+char* TChunkedMemoryPool::AllocateAlignedSlow(size_t size, int align)
+{
+ // NB: Do not rely on any particular alignment of chunks.
+ auto* large = AllocateSlowCore(size + align);
+ if (large) {
+ return AlignUp(large, align);
+ }
+ return AllocateAligned(size, align);
+}
+
+char* TChunkedMemoryPool::AllocateSlowCore(size_t size)
+{
+ TMutableRef ref;
+ if (size > RegularChunkSize) {
+ auto block = ChunkProvider_->Allocate(size, TagCookie_);
+ ref = block->GetRef();
+ Size_ += size;
+ Capacity_ += ref.Size();
+ OtherBlocks_.push_back(std::move(block));
+ return ref.Begin();
+ }
+
+ YT_VERIFY(NextChunkIndex_ <= std::ssize(Chunks_));
+
+ if (NextSmallSize_ < RegularChunkSize) {
+ auto block = ChunkProvider_->Allocate(std::max(NextSmallSize_, size), TagCookie_);
+ ref = block->GetRef();
+ Capacity_ += ref.Size();
+ OtherBlocks_.push_back(std::move(block));
+ NextSmallSize_ = 2 * ref.Size();
+ } else if (NextChunkIndex_ == std::ssize(Chunks_)) {
+ auto chunk = ChunkProvider_->Allocate(RegularChunkSize, TagCookie_);
+ ref = chunk->GetRef();
+ Capacity_ += ref.Size();
+ Chunks_.push_back(std::move(chunk));
+ ++NextChunkIndex_;
+ } else {
+ ref = Chunks_[NextChunkIndex_++]->GetRef();
+ }
+
+ FreeZoneBegin_ = ref.Begin();
+ FreeZoneEnd_ = ref.End();
+
+ return nullptr;
+}
+
+void TChunkedMemoryPool::Absorb(TChunkedMemoryPool&& other)
+{
+ YT_VERIFY(ChunkProvider_ == other.ChunkProvider_);
+
+ OtherBlocks_.reserve(OtherBlocks_.size() + other.OtherBlocks_.size());
+ for (auto& block : other.OtherBlocks_) {
+ OtherBlocks_.push_back(std::move(block));
+ }
+ other.OtherBlocks_.clear();
+
+ // Suppose that
+ // - "A" is filled blocks of the current pool;
+ // - "a" is free blocks of the current pool;
+ // - "B" is filled blocks of the other pool;
+ // - "b" is free blocks of the other pool.
+ // Then, from the initial layouts "AA...Aaa...a" and "BB...Bbb...b" we obtain "BB..BAA..Aaa...abb...b".
+ Chunks_.reserve(Chunks_.size() + other.Chunks_.size());
+ size_t oldSize = Chunks_.size();
+ for (auto& chunk : other.Chunks_) {
+ Chunks_.push_back(std::move(chunk));
+ }
+ // Transform "AA...Aaa...aBB...B" => "BB...BAA...Aaa...a"
+ std::rotate(Chunks_.begin(), Chunks_.begin() + oldSize, Chunks_.begin() + oldSize + other.NextChunkIndex_);
+ if (NextChunkIndex_ == 0) {
+ FreeZoneBegin_ = other.FreeZoneBegin_;
+ FreeZoneEnd_ = other.FreeZoneEnd_;
+ }
+ NextChunkIndex_ += other.NextChunkIndex_;
+ other.Chunks_.clear();
+ other.FreeZoneBegin_ = nullptr;
+ other.FreeZoneEnd_ = nullptr;
+ other.NextChunkIndex_ = 0;
+
+ Size_ += other.Size_;
+ Capacity_ += other.Capacity_;
+ other.Size_ = 0;
+ other.Capacity_ = 0;
+}
+
+size_t TChunkedMemoryPool::GetSize() const
+{
+ return Size_;
+}
+
+size_t TChunkedMemoryPool::GetCapacity() const
+{
+ return Capacity_;
+}
+
+size_t TChunkedMemoryPool::GetCurrentChunkSpareSize() const
+{
+ return FreeZoneEnd_ - FreeZoneBegin_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_memory_pool.h b/library/cpp/yt/memory/chunked_memory_pool.h
new file mode 100644
index 0000000000..a22e2ec27c
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_pool.h
@@ -0,0 +1,159 @@
+#pragma once
+
+#include "public.h"
+#include "ref.h"
+
+#include <util/generic/size_literals.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TDefaultChunkedMemoryPoolTag { };
+
+// TAllocationHolder is polymorphic. So we cannot use TWithExtraSpace mixin
+// because it needs the most derived type as a template argument and
+// it would require GetExtraSpacePtr/GetRef methods to be virtual.
+
+class TAllocationHolder
+{
+public:
+ TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie);
+ TAllocationHolder(const TAllocationHolder&) = delete;
+ TAllocationHolder(TAllocationHolder&&) = default;
+ virtual ~TAllocationHolder();
+
+ void operator delete(void* ptr) noexcept;
+
+ TMutableRef GetRef() const;
+
+ template <class TDerived>
+ static TDerived* Allocate(size_t size, TRefCountedTypeCookie cookie);
+
+private:
+ const TMutableRef Ref_;
+#ifdef YT_ENABLE_REF_COUNTED_TRACKING
+ const TRefCountedTypeCookie Cookie_;
+#endif
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IMemoryChunkProvider
+ : public TRefCounted
+{
+ virtual std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IMemoryChunkProvider)
+
+const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider();
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TChunkedMemoryPool
+ : private TNonCopyable
+{
+public:
+ static constexpr size_t DefaultStartChunkSize = 4_KB;
+ static constexpr size_t RegularChunkSize = 36_KB - 512;
+
+ TChunkedMemoryPool(
+ TRefCountedTypeCookie tagCookie,
+ IMemoryChunkProviderPtr chunkProvider,
+ size_t startChunkSize = DefaultStartChunkSize);
+
+ explicit TChunkedMemoryPool(
+ TRefCountedTypeCookie tagCookie,
+ size_t startChunkSize = DefaultStartChunkSize);
+
+ TChunkedMemoryPool();
+
+ template <class TTag>
+ explicit TChunkedMemoryPool(
+ TTag,
+ size_t startChunkSize = DefaultStartChunkSize);
+
+ //! Allocates #sizes bytes without any alignment.
+ char* AllocateUnaligned(size_t size);
+
+ //! Allocates #size bytes aligned with 8-byte granularity.
+ char* AllocateAligned(size_t size, int align = 8);
+
+ //! Allocates #n uninitialized instances of #T.
+ template <class T>
+ T* AllocateUninitialized(int n, int align = alignof(T));
+
+ //! Allocates space and copies #src inside it.
+ template <class T>
+ TMutableRange<T> Capture(TRange<T> src, int align = alignof(T));
+
+ //! Frees memory range if possible: namely, if the free region is a suffix of last allocated region.
+ void Free(char* from, char* to);
+
+ //! Marks all previously allocated small chunks as free for subsequent allocations but
+ //! does not deallocate them.
+ //! Purges all large blocks.
+ void Clear();
+
+ //! Purges all allocated memory, including small chunks.
+ void Purge();
+
+ //! Returns the number of allocated bytes.
+ size_t GetSize() const;
+
+ //! Returns the number of reserved bytes.
+ size_t GetCapacity() const;
+
+ //! Returns the number of bytes that can be acquired in the current chunk
+ //! without additional allocations.
+ size_t GetCurrentChunkSpareSize() const;
+
+ //! Moves all the allocated memory from other memory pool to the current one.
+ //! The other pool becomes empty, like after Purge() call.
+ void Absorb(TChunkedMemoryPool&& other);
+
+private:
+ const TRefCountedTypeCookie TagCookie_;
+ // A common usecase is to construct TChunkedMemoryPool with the default
+ // memory chunk provider. The latter is ref-counted and is shared between
+ // a multitude of TChunkedMemoryPool instances. This could potentially
+ // lead to a contention over IMemoryChunkProvider's ref-counter.
+ // To circumvent this, we keep both an owning (#ChunkProviderHolder_) and
+ // a non-owning (#ChunkProvider_) reference to the underlying provider.
+ // In case of the default chunk provider, the owning reference is not used.
+ const IMemoryChunkProviderPtr ChunkProviderHolder_;
+ IMemoryChunkProvider* const ChunkProvider_;
+
+ int NextChunkIndex_ = 0;
+ size_t NextSmallSize_;
+
+ size_t Size_ = 0;
+ size_t Capacity_ = 0;
+
+ // Chunk memory layout:
+ // |AAAA|....|UUUU|
+ // Legend:
+ // A aligned allocations
+ // U unaligned allocations
+ // . free zone
+ char* FreeZoneBegin_;
+ char* FreeZoneEnd_;
+
+ std::vector<std::unique_ptr<TAllocationHolder>> Chunks_;
+ std::vector<std::unique_ptr<TAllocationHolder>> OtherBlocks_;
+
+ void Initialize(size_t startChunkSize);
+
+ char* AllocateUnalignedSlow(size_t size);
+ char* AllocateAlignedSlow(size_t size, int align);
+ char* AllocateSlowCore(size_t size);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define CHUNKED_MEMORY_POOL_INL_H_
+#include "chunked_memory_pool-inl.h"
+#undef CHUNKED_MEMORY_POOL_INL_H_
diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.cpp b/library/cpp/yt/memory/chunked_memory_pool_output.cpp
new file mode 100644
index 0000000000..332096445a
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_pool_output.cpp
@@ -0,0 +1,65 @@
+#include "chunked_memory_pool_output.h"
+
+#include "chunked_memory_pool.h"
+
+#include <library/cpp/yt/memory/ref.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TChunkedMemoryPoolOutput::TChunkedMemoryPoolOutput(TChunkedMemoryPool* pool, size_t chunkSize)
+ : Pool_(pool)
+ , ChunkSize_(chunkSize)
+{ }
+
+size_t TChunkedMemoryPoolOutput::DoNext(void** ptr)
+{
+ // Check if the current chunk is exhausted.
+ if (Current_ == End_) {
+ // Emplace the (whole) last chunk, if any.
+ if (Begin_) {
+ Refs_.emplace_back(Begin_, Current_);
+ }
+ // Allocate a new chunk.
+ // Use |AllocateAligned| to get a chance to free some memory afterwards.
+ // Tune the number of bytes requested from the pool to try avoid allocations.
+ auto spareSize = Pool_->GetCurrentChunkSpareSize();
+ auto allocationSize = (spareSize == 0 ? ChunkSize_ : std::min(ChunkSize_, spareSize));
+ Begin_ = Pool_->AllocateAligned(allocationSize, /* align */ 1);
+ Current_ = Begin_;
+ End_ = Begin_ + allocationSize;
+ }
+
+ // Return the unused part of the current chunk.
+ // This could be the whole chunk allocated above.
+ *ptr = Current_;
+ auto size = End_ - Current_;
+ Current_ = End_;
+ return size;
+}
+
+void TChunkedMemoryPoolOutput::DoUndo(size_t size)
+{
+ // Just rewind the current pointer.
+ Current_ -= size;
+ YT_VERIFY(Current_ >= Begin_);
+}
+
+std::vector<TMutableRef> TChunkedMemoryPoolOutput::Finish()
+{
+ // Emplace the used part of the last chunk, if any.
+ if (Begin_) {
+ Refs_.emplace_back(Begin_, Current_);
+ }
+ // Try to free the unused part of the last chunk, if possible.
+ if (Current_ < End_) {
+ Pool_->Free(Current_, End_);
+ }
+ return std::move(Refs_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.h b/library/cpp/yt/memory/chunked_memory_pool_output.h
new file mode 100644
index 0000000000..774b21788e
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_memory_pool_output.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include "public.h"
+#include "ref.h"
+
+#include <util/stream/zerocopy_output.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TChunkedMemoryPoolOutput
+ : public IZeroCopyOutput
+{
+public:
+ static constexpr size_t DefaultChunkSize = 4_KB;
+
+ explicit TChunkedMemoryPoolOutput(
+ TChunkedMemoryPool* pool,
+ size_t chunkSize = DefaultChunkSize);
+
+ std::vector<TMutableRef> Finish();
+
+private:
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t size) override;
+
+private:
+ TChunkedMemoryPool* const Pool_;
+ const size_t ChunkSize_;
+
+ char* Begin_ = nullptr;
+ char* Current_ = nullptr;
+ char* End_ = nullptr;
+ std::vector<TMutableRef> Refs_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h
new file mode 100644
index 0000000000..86965457f8
--- /dev/null
+++ b/library/cpp/yt/memory/public.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "ref_counted.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TChunkedMemoryPool;
+
+DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp
new file mode 100644
index 0000000000..839fe06ce6
--- /dev/null
+++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp
@@ -0,0 +1,39 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <library/cpp/yt/memory/chunked_memory_pool.h>
+#include <library/cpp/yt/memory/chunked_memory_pool_output.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TChunkedMemoryPoolOutputTest, Basic)
+{
+ constexpr size_t PoolChunkSize = 10;
+ constexpr size_t PoolOutputChunkSize = 7;
+ TChunkedMemoryPool pool(NullRefCountedTypeCookie, PoolChunkSize);
+ TChunkedMemoryPoolOutput output(&pool, PoolOutputChunkSize);
+
+ TString s1("Short.");
+ output.Write(s1);
+
+ TString s2("Quite a long string.");
+ output.Write(s2);
+
+ char* buf;
+ auto len = output.Next(&buf);
+ output.Undo(len);
+
+ auto chunks = output.Finish();
+ TString s;
+ for (auto chunk : chunks) {
+ s += TString(chunk.Begin(), chunk.End());
+ }
+ ASSERT_EQ(s1 + s2, s);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp
new file mode 100644
index 0000000000..63562cfb96
--- /dev/null
+++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp
@@ -0,0 +1,74 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <library/cpp/yt/memory/chunked_memory_pool.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TChunkedMemoryPoolTest, Absorb)
+{
+ TChunkedMemoryPool first;
+ TChunkedMemoryPool second;
+ TChunkedMemoryPool third;
+
+ std::vector<std::pair<TStringBuf, TString>> tests;
+ size_t totalSize = 0;
+
+ auto fillPool = [&] (TChunkedMemoryPool& pool, TString prefix, int count) {
+ for (int i = 0; i < count; i++) {
+ TString expected = prefix + ToString(count);
+ char* buf = pool.AllocateUnaligned(expected.Size());
+ ::memcpy(buf, expected.c_str(), expected.size());
+ TStringBuf ref(buf, buf + expected.size());
+ totalSize += expected.size();
+ tests.emplace_back(std::move(ref), std::move(expected));
+ }
+ };
+
+ auto checkAll = [&] {
+ ASSERT_GE(first.GetCapacity(), first.GetSize());
+ ASSERT_GE(second.GetCapacity(), second.GetSize());
+ ASSERT_GE(third.GetCapacity(), third.GetSize());
+ ASSERT_EQ(totalSize, first.GetSize() + second.GetSize() + third.GetSize());
+
+ for (const auto& [ref, expected] : tests) {
+ ASSERT_EQ(ref, expected);
+ }
+ };
+
+ auto warmup = [&] (TChunkedMemoryPool& pool, int blockSize, int count) {
+ ASSERT_EQ(pool.GetSize(), 0U);
+ for (int i = 0; i < count; i++) {
+ pool.AllocateUnaligned(blockSize);
+ }
+ pool.Clear();
+ ASSERT_EQ(pool.GetSize(), 0U);
+ };
+
+ warmup(first, 20, 20'000);
+ warmup(second, 20, 20'000);
+ fillPool(first, "firstPool", 10'000);
+ fillPool(second, "secondPool", 10'000);
+ fillPool(third, "thirdPool", 10'000);
+ checkAll();
+ second.Absorb(std::move(third));
+ checkAll();
+ first.Absorb(std::move(second));
+ checkAll();
+ fillPool(first, "poolFirst_2", 10'000);
+ fillPool(second, "poolSecond_2", 10'000);
+ fillPool(third, "poolThird_2", 10'000);
+ checkAll();
+ third.Absorb(std::move(second));
+ checkAll();
+ fillPool(second, "someStr2", 10'000);
+ fillPool(third, "someStr3", 10'000);
+ checkAll();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT