diff options
author | babenko <babenko@yandex-team.com> | 2023-01-03 13:23:49 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2023-01-03 13:23:49 +0300 |
commit | 85dbae30a801094e01a9aa5e4ecb1be070420ed4 (patch) | |
tree | 1381aac9994baae96382a5231e8b1596d876320f /library | |
parent | 8ee4eaa91898ce3adcdf302ba33311fc9e627282 (diff) | |
download | ydb-85dbae30a801094e01a9aa5e4ecb1be070420ed4.tar.gz |
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library
wip
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/yt/memory/CMakeLists.darwin.txt | 4 | ||||
-rw-r--r-- | library/cpp/yt/memory/CMakeLists.linux-aarch64.txt | 4 | ||||
-rw-r--r-- | library/cpp/yt/memory/CMakeLists.linux.txt | 4 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_input_stream.cpp | 34 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_input_stream.h | 29 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator-inl.h | 44 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator.cpp | 57 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator.h | 71 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool-inl.h | 142 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool.cpp | 200 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool.h | 159 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool_output.cpp | 65 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool_output.h | 43 | ||||
-rw-r--r-- | library/cpp/yt/memory/public.h | 15 | ||||
-rw-r--r-- | library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp | 39 | ||||
-rw-r--r-- | library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp | 74 |
16 files changed, 984 insertions, 0 deletions
diff --git a/library/cpp/yt/memory/CMakeLists.darwin.txt b/library/cpp/yt/memory/CMakeLists.darwin.txt index 5bcfddec02..e3c3cae0f0 100644 --- a/library/cpp/yt/memory/CMakeLists.darwin.txt +++ b/library/cpp/yt/memory/CMakeLists.darwin.txt @@ -17,6 +17,10 @@ target_link_libraries(cpp-yt-memory PUBLIC ) target_sources(cpp-yt-memory PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt index 0e7c58d94f..2950017992 100644 --- a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt +++ b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt @@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC ) target_sources(cpp-yt-memory PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/CMakeLists.linux.txt b/library/cpp/yt/memory/CMakeLists.linux.txt index 0e7c58d94f..2950017992 100644 --- a/library/cpp/yt/memory/CMakeLists.linux.txt +++ b/library/cpp/yt/memory/CMakeLists.linux.txt @@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC ) target_sources(cpp-yt-memory PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/chunked_input_stream.cpp b/library/cpp/yt/memory/chunked_input_stream.cpp new file mode 100644 index 0000000000..ea579a46ab --- /dev/null +++ b/library/cpp/yt/memory/chunked_input_stream.cpp @@ -0,0 +1,34 @@ +#include "chunked_input_stream.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedInputStream::TChunkedInputStream(std::vector<TSharedRef> blocks) + : Blocks_(std::move(blocks)) +{ } + +size_t TChunkedInputStream::DoNext(const void** ptr, size_t len) +{ + SkipCompletedBlocks(); + if (Index_ == Blocks_.size()) { + *ptr = nullptr; + return 0; + } + *ptr = Blocks_[Index_].Begin() + Position_; + size_t toSkip = std::min(Blocks_[Index_].Size() - Position_, len); + Position_ += toSkip; + return toSkip; +} + +void TChunkedInputStream::SkipCompletedBlocks() +{ + while (Index_ < Blocks_.size() && Position_ == Blocks_[Index_].Size()) { + Index_ += 1; + Position_ = 0; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_input_stream.h b/library/cpp/yt/memory/chunked_input_stream.h new file mode 100644 index 0000000000..baef2a4c66 --- /dev/null +++ b/library/cpp/yt/memory/chunked_input_stream.h @@ -0,0 +1,29 @@ +#pragma once + +#include "ref.h" + +#include <util/stream/zerocopy.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedInputStream + : public IZeroCopyInput +{ +public: + explicit TChunkedInputStream(std::vector<TSharedRef> blocks); + + size_t DoNext(const void** ptr, size_t len) override; + +private: + const std::vector<TSharedRef> Blocks_; + size_t Index_ = 0; + size_t Position_ = 0; + + void SkipCompletedBlocks(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator-inl.h b/library/cpp/yt/memory/chunked_memory_allocator-inl.h new file mode 100644 index 0000000000..fe66060b2d --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator-inl.h @@ -0,0 +1,44 @@ +#ifndef CHUNKED_MEMORY_ALLOCATOR_INL_H_ +#error "Direct inclusion of this file is not allowed, include chunked_memory_allocator.h" +// For the sake of sane code completion. +#include "chunked_memory_allocator.h" +#endif + +#include "serialize.h" + +#include <util/system/align.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +inline TSharedMutableRef TChunkedMemoryAllocator::AllocateUnaligned(i64 size) +{ + // Fast path. + if (FreeZoneEnd_ >= FreeZoneBegin_ + size) { + FreeZoneEnd_ -= size; + return Chunk_.Slice(FreeZoneEnd_, FreeZoneEnd_ + size); + } + + // Slow path. + return AllocateUnalignedSlow(size); +} + +inline TSharedMutableRef TChunkedMemoryAllocator::AllocateAligned(i64 size, int align) +{ + // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full. + FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align); + + // Fast path. + if (FreeZoneBegin_ + size <= FreeZoneEnd_) { + FreeZoneBegin_ += size; + return Chunk_.Slice(FreeZoneBegin_ - size, FreeZoneBegin_); + } + + // Slow path. + return AllocateAlignedSlow(size, align); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator.cpp b/library/cpp/yt/memory/chunked_memory_allocator.cpp new file mode 100644 index 0000000000..3560f8096d --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator.cpp @@ -0,0 +1,57 @@ +#include "chunked_memory_allocator.h" +#include "serialize.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +const i64 TChunkedMemoryAllocator::DefaultChunkSize = 4096; +const double TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio = 0.25; + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryAllocator::TChunkedMemoryAllocator( + i64 chunkSize, + double maxSmallBlockSizeRatio, + TRefCountedTypeCookie tagCookie) + : ChunkSize_(chunkSize) + , MaxSmallBlockSize_(static_cast<i64>(ChunkSize_ * maxSmallBlockSizeRatio)) + , TagCookie_(tagCookie) +{ } + +TSharedMutableRef TChunkedMemoryAllocator::AllocateUnalignedSlow(i64 size) +{ + auto large = AllocateSlowCore(size); + if (large) { + return large; + } + return AllocateUnaligned(size); +} + +TSharedMutableRef TChunkedMemoryAllocator::AllocateAlignedSlow(i64 size, int align) +{ + // NB: Do not rely on any particular alignment of chunks. + auto large = AllocateSlowCore(size + align); + if (large) { + auto* alignedBegin = AlignUp(large.Begin(), align); + return large.Slice(alignedBegin, alignedBegin + size); + } + return AllocateAligned(size, align); +} + +TSharedMutableRef TChunkedMemoryAllocator::AllocateSlowCore(i64 size) +{ + if (size > MaxSmallBlockSize_) { + return TSharedMutableRef::Allocate(size, {.InitializeStorage = false}, TagCookie_); + } + + Chunk_ = TSharedMutableRef::Allocate(ChunkSize_, {.InitializeStorage = false}, TagCookie_); + FreeZoneBegin_ = Chunk_.Begin(); + FreeZoneEnd_ = Chunk_.End(); + + return TSharedMutableRef(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator.h b/library/cpp/yt/memory/chunked_memory_allocator.h new file mode 100644 index 0000000000..33559d8aa7 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator.h @@ -0,0 +1,71 @@ +#pragma once + +#include "ref.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedMemoryAllocatorTag +{ }; + +//! Mimics TChunkedMemoryPool but acts as an allocator returning shared refs. +class TChunkedMemoryAllocator + : private TNonCopyable +{ +public: + static const i64 DefaultChunkSize; + static const double DefaultMaxSmallBlockSizeRatio; + + explicit TChunkedMemoryAllocator( + i64 chunkSize = DefaultChunkSize, + double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio, + TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedMemoryAllocatorTag>()); + + template <class TTag> + explicit TChunkedMemoryAllocator( + TTag /*tag*/ = TTag(), + i64 chunkSize = DefaultChunkSize, + double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio) + : TChunkedMemoryAllocator( + chunkSize, + maxSmallBlockSizeRatio, + GetRefCountedTypeCookie<TTag>()) + { } + + //! Allocates #sizes bytes without any alignment. + TSharedMutableRef AllocateUnaligned(i64 size); + + //! Allocates #size bytes aligned with 8-byte granularity. + TSharedMutableRef AllocateAligned(i64 size, int align = 8); + +private: + const i64 ChunkSize_; + const i64 MaxSmallBlockSize_; + const TRefCountedTypeCookie TagCookie_; + + char EmptyBuf_[0]; + + // Chunk memory layout: + // |AAAA|....|UUUU| + // Legend: + // A aligned allocations + // U unaligned allocations + // . free zone + char* FreeZoneBegin_ = EmptyBuf_; + char* FreeZoneEnd_ = EmptyBuf_; + TSharedMutableRef Chunk_; + + TSharedMutableRef AllocateUnalignedSlow(i64 size); + TSharedMutableRef AllocateAlignedSlow(i64 size, int align); + TSharedMutableRef AllocateSlowCore(i64 size); + +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define CHUNKED_MEMORY_ALLOCATOR_INL_H_ +#include "chunked_memory_allocator-inl.h" +#undef CHUNKED_MEMORY_ALLOCATOR_INL_H_ diff --git a/library/cpp/yt/memory/chunked_memory_pool-inl.h b/library/cpp/yt/memory/chunked_memory_pool-inl.h new file mode 100644 index 0000000000..267c3e9221 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool-inl.h @@ -0,0 +1,142 @@ +#ifndef CHUNKED_MEMORY_POOL_INL_H_ +#error "Direct inclusion of this file is not allowed, include chunked_memory_pool.h" +// For the sake of sane code completion. +#include "chunked_memory_pool.h" +#endif + +#include "serialize.h" + +#include <library/cpp/yt/malloc/malloc.h> + +#include <util/system/align.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +inline void TAllocationHolder::operator delete(void* ptr) noexcept +{ + ::free(ptr); +} + +inline TMutableRef TAllocationHolder::GetRef() const +{ + return Ref_; +} + +template <class TDerived> +TDerived* TAllocationHolder::Allocate(size_t size, TRefCountedTypeCookie cookie) +{ + auto requestedSize = sizeof(TDerived) + size; + auto* ptr = ::malloc(requestedSize); + auto allocatedSize = ::malloc_usable_size(ptr); + if (allocatedSize) { + size += allocatedSize - requestedSize; + } + + auto* instance = static_cast<TDerived*>(ptr); + + try { + new (instance) TDerived(TMutableRef(instance + 1, size), cookie); + } catch (const std::exception& ex) { + // Do not forget to free the memory. + ::free(ptr); + throw; + } + + return instance; +} + +//////////////////////////////////////////////////////////////////////////////// + +inline TChunkedMemoryPool::TChunkedMemoryPool() + : TChunkedMemoryPool( + GetRefCountedTypeCookie<TDefaultChunkedMemoryPoolTag>()) +{ } + +template <class TTag> +inline TChunkedMemoryPool::TChunkedMemoryPool( + TTag, + size_t startChunkSize) + : TChunkedMemoryPool( + GetRefCountedTypeCookie<TTag>(), + startChunkSize) +{ } + +inline char* TChunkedMemoryPool::AllocateUnaligned(size_t size) +{ + // Fast path. + if (FreeZoneEnd_ >= FreeZoneBegin_ + size) { + FreeZoneEnd_ -= size; + Size_ += size; + return FreeZoneEnd_; + } + + // Slow path. + return AllocateUnalignedSlow(size); +} + +inline char* TChunkedMemoryPool::AllocateAligned(size_t size, int align) +{ + // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full. + FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align); + + // Fast path. + if (FreeZoneBegin_ + size <= FreeZoneEnd_) { + char* result = FreeZoneBegin_; + Size_ += size; + FreeZoneBegin_ += size; + return result; + } + + // Slow path. + return AllocateAlignedSlow(size, align); +} + +template <class T> +inline T* TChunkedMemoryPool::AllocateUninitialized(int n, int align) +{ + return reinterpret_cast<T*>(AllocateAligned(sizeof(T) * n, align)); +} + +template <class T> +inline TMutableRange<T> TChunkedMemoryPool::Capture(TRange<T> src, int align) +{ + auto* dst = AllocateUninitialized<T>(src.Size(), align); + ::memcpy(dst, src.Begin(), sizeof(T) * src.Size()); + return TMutableRange<T>(dst, src.Size()); +} + +inline void TChunkedMemoryPool::Free(char* from, char* to) +{ + if (FreeZoneBegin_ == to) { + FreeZoneBegin_ = from; + } + if (FreeZoneEnd_ == from) { + FreeZoneEnd_ = to; + } +} + +inline void TChunkedMemoryPool::Clear() +{ + Size_ = 0; + + if (Chunks_.empty()) { + FreeZoneBegin_ = nullptr; + FreeZoneEnd_ = nullptr; + NextChunkIndex_ = 0; + } else { + FreeZoneBegin_ = Chunks_.front()->GetRef().Begin(); + FreeZoneEnd_ = Chunks_.front()->GetRef().End(); + NextChunkIndex_ = 1; + } + + for (const auto& block : OtherBlocks_) { + Capacity_ -= block->GetRef().Size(); + } + OtherBlocks_.clear(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_pool.cpp b/library/cpp/yt/memory/chunked_memory_pool.cpp new file mode 100644 index 0000000000..a10a6fe724 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool.cpp @@ -0,0 +1,200 @@ +#include "chunked_memory_pool.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TAllocationHolder::TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie) + : Ref_(ref) +#ifdef YT_ENABLE_REF_COUNTED_TRACKING + , Cookie_(cookie) +#endif +{ +#ifdef YT_ENABLE_REF_COUNTED_TRACKING + if (Cookie_ != NullRefCountedTypeCookie) { + TRefCountedTrackerFacade::AllocateTagInstance(Cookie_); + TRefCountedTrackerFacade::AllocateSpace(Cookie_, Ref_.Size()); + } +#endif +} + +TAllocationHolder::~TAllocationHolder() +{ +#ifdef YT_ENABLE_REF_COUNTED_TRACKING + if (Cookie_ != NullRefCountedTypeCookie) { + TRefCountedTrackerFacade::FreeTagInstance(Cookie_); + TRefCountedTrackerFacade::FreeSpace(Cookie_, Ref_.Size()); + } +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultMemoryChunkProvider + : public IMemoryChunkProvider +{ +public: + std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) override + { + return std::unique_ptr<TAllocationHolder>(TAllocationHolder::Allocate<TAllocationHolder>(size, cookie)); + } +}; + +const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider() +{ + static const IMemoryChunkProviderPtr Result = New<TDefaultMemoryChunkProvider>(); + return Result; +} + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryPool::TChunkedMemoryPool( + TRefCountedTypeCookie tagCookie, + IMemoryChunkProviderPtr chunkProvider, + size_t startChunkSize) + : TagCookie_(tagCookie) + , ChunkProviderHolder_(std::move(chunkProvider)) + , ChunkProvider_(ChunkProviderHolder_.Get()) +{ + Initialize(startChunkSize); +} + +TChunkedMemoryPool::TChunkedMemoryPool( + TRefCountedTypeCookie tagCookie, + size_t startChunkSize) + : TagCookie_(tagCookie) + , ChunkProvider_(GetDefaultMemoryChunkProvider().Get()) +{ + Initialize(startChunkSize); +} + +void TChunkedMemoryPool::Initialize(size_t startChunkSize) +{ + NextSmallSize_ = startChunkSize; + FreeZoneBegin_ = nullptr; + FreeZoneEnd_ = nullptr; +} + +void TChunkedMemoryPool::Purge() +{ + Chunks_.clear(); + OtherBlocks_.clear(); + Size_ = 0; + Capacity_ = 0; + NextChunkIndex_ = 0; + FreeZoneBegin_ = nullptr; + FreeZoneEnd_ = nullptr; +} + +char* TChunkedMemoryPool::AllocateUnalignedSlow(size_t size) +{ + auto* large = AllocateSlowCore(size); + if (large) { + return large; + } + return AllocateUnaligned(size); +} + +char* TChunkedMemoryPool::AllocateAlignedSlow(size_t size, int align) +{ + // NB: Do not rely on any particular alignment of chunks. + auto* large = AllocateSlowCore(size + align); + if (large) { + return AlignUp(large, align); + } + return AllocateAligned(size, align); +} + +char* TChunkedMemoryPool::AllocateSlowCore(size_t size) +{ + TMutableRef ref; + if (size > RegularChunkSize) { + auto block = ChunkProvider_->Allocate(size, TagCookie_); + ref = block->GetRef(); + Size_ += size; + Capacity_ += ref.Size(); + OtherBlocks_.push_back(std::move(block)); + return ref.Begin(); + } + + YT_VERIFY(NextChunkIndex_ <= std::ssize(Chunks_)); + + if (NextSmallSize_ < RegularChunkSize) { + auto block = ChunkProvider_->Allocate(std::max(NextSmallSize_, size), TagCookie_); + ref = block->GetRef(); + Capacity_ += ref.Size(); + OtherBlocks_.push_back(std::move(block)); + NextSmallSize_ = 2 * ref.Size(); + } else if (NextChunkIndex_ == std::ssize(Chunks_)) { + auto chunk = ChunkProvider_->Allocate(RegularChunkSize, TagCookie_); + ref = chunk->GetRef(); + Capacity_ += ref.Size(); + Chunks_.push_back(std::move(chunk)); + ++NextChunkIndex_; + } else { + ref = Chunks_[NextChunkIndex_++]->GetRef(); + } + + FreeZoneBegin_ = ref.Begin(); + FreeZoneEnd_ = ref.End(); + + return nullptr; +} + +void TChunkedMemoryPool::Absorb(TChunkedMemoryPool&& other) +{ + YT_VERIFY(ChunkProvider_ == other.ChunkProvider_); + + OtherBlocks_.reserve(OtherBlocks_.size() + other.OtherBlocks_.size()); + for (auto& block : other.OtherBlocks_) { + OtherBlocks_.push_back(std::move(block)); + } + other.OtherBlocks_.clear(); + + // Suppose that + // - "A" is filled blocks of the current pool; + // - "a" is free blocks of the current pool; + // - "B" is filled blocks of the other pool; + // - "b" is free blocks of the other pool. + // Then, from the initial layouts "AA...Aaa...a" and "BB...Bbb...b" we obtain "BB..BAA..Aaa...abb...b". + Chunks_.reserve(Chunks_.size() + other.Chunks_.size()); + size_t oldSize = Chunks_.size(); + for (auto& chunk : other.Chunks_) { + Chunks_.push_back(std::move(chunk)); + } + // Transform "AA...Aaa...aBB...B" => "BB...BAA...Aaa...a" + std::rotate(Chunks_.begin(), Chunks_.begin() + oldSize, Chunks_.begin() + oldSize + other.NextChunkIndex_); + if (NextChunkIndex_ == 0) { + FreeZoneBegin_ = other.FreeZoneBegin_; + FreeZoneEnd_ = other.FreeZoneEnd_; + } + NextChunkIndex_ += other.NextChunkIndex_; + other.Chunks_.clear(); + other.FreeZoneBegin_ = nullptr; + other.FreeZoneEnd_ = nullptr; + other.NextChunkIndex_ = 0; + + Size_ += other.Size_; + Capacity_ += other.Capacity_; + other.Size_ = 0; + other.Capacity_ = 0; +} + +size_t TChunkedMemoryPool::GetSize() const +{ + return Size_; +} + +size_t TChunkedMemoryPool::GetCapacity() const +{ + return Capacity_; +} + +size_t TChunkedMemoryPool::GetCurrentChunkSpareSize() const +{ + return FreeZoneEnd_ - FreeZoneBegin_; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_pool.h b/library/cpp/yt/memory/chunked_memory_pool.h new file mode 100644 index 0000000000..a22e2ec27c --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool.h @@ -0,0 +1,159 @@ +#pragma once + +#include "public.h" +#include "ref.h" + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedMemoryPoolTag { }; + +// TAllocationHolder is polymorphic. So we cannot use TWithExtraSpace mixin +// because it needs the most derived type as a template argument and +// it would require GetExtraSpacePtr/GetRef methods to be virtual. + +class TAllocationHolder +{ +public: + TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie); + TAllocationHolder(const TAllocationHolder&) = delete; + TAllocationHolder(TAllocationHolder&&) = default; + virtual ~TAllocationHolder(); + + void operator delete(void* ptr) noexcept; + + TMutableRef GetRef() const; + + template <class TDerived> + static TDerived* Allocate(size_t size, TRefCountedTypeCookie cookie); + +private: + const TMutableRef Ref_; +#ifdef YT_ENABLE_REF_COUNTED_TRACKING + const TRefCountedTypeCookie Cookie_; +#endif +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct IMemoryChunkProvider + : public TRefCounted +{ + virtual std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IMemoryChunkProvider) + +const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider(); + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPool + : private TNonCopyable +{ +public: + static constexpr size_t DefaultStartChunkSize = 4_KB; + static constexpr size_t RegularChunkSize = 36_KB - 512; + + TChunkedMemoryPool( + TRefCountedTypeCookie tagCookie, + IMemoryChunkProviderPtr chunkProvider, + size_t startChunkSize = DefaultStartChunkSize); + + explicit TChunkedMemoryPool( + TRefCountedTypeCookie tagCookie, + size_t startChunkSize = DefaultStartChunkSize); + + TChunkedMemoryPool(); + + template <class TTag> + explicit TChunkedMemoryPool( + TTag, + size_t startChunkSize = DefaultStartChunkSize); + + //! Allocates #sizes bytes without any alignment. + char* AllocateUnaligned(size_t size); + + //! Allocates #size bytes aligned with 8-byte granularity. + char* AllocateAligned(size_t size, int align = 8); + + //! Allocates #n uninitialized instances of #T. + template <class T> + T* AllocateUninitialized(int n, int align = alignof(T)); + + //! Allocates space and copies #src inside it. + template <class T> + TMutableRange<T> Capture(TRange<T> src, int align = alignof(T)); + + //! Frees memory range if possible: namely, if the free region is a suffix of last allocated region. + void Free(char* from, char* to); + + //! Marks all previously allocated small chunks as free for subsequent allocations but + //! does not deallocate them. + //! Purges all large blocks. + void Clear(); + + //! Purges all allocated memory, including small chunks. + void Purge(); + + //! Returns the number of allocated bytes. + size_t GetSize() const; + + //! Returns the number of reserved bytes. + size_t GetCapacity() const; + + //! Returns the number of bytes that can be acquired in the current chunk + //! without additional allocations. + size_t GetCurrentChunkSpareSize() const; + + //! Moves all the allocated memory from other memory pool to the current one. + //! The other pool becomes empty, like after Purge() call. + void Absorb(TChunkedMemoryPool&& other); + +private: + const TRefCountedTypeCookie TagCookie_; + // A common usecase is to construct TChunkedMemoryPool with the default + // memory chunk provider. The latter is ref-counted and is shared between + // a multitude of TChunkedMemoryPool instances. This could potentially + // lead to a contention over IMemoryChunkProvider's ref-counter. + // To circumvent this, we keep both an owning (#ChunkProviderHolder_) and + // a non-owning (#ChunkProvider_) reference to the underlying provider. + // In case of the default chunk provider, the owning reference is not used. + const IMemoryChunkProviderPtr ChunkProviderHolder_; + IMemoryChunkProvider* const ChunkProvider_; + + int NextChunkIndex_ = 0; + size_t NextSmallSize_; + + size_t Size_ = 0; + size_t Capacity_ = 0; + + // Chunk memory layout: + // |AAAA|....|UUUU| + // Legend: + // A aligned allocations + // U unaligned allocations + // . free zone + char* FreeZoneBegin_; + char* FreeZoneEnd_; + + std::vector<std::unique_ptr<TAllocationHolder>> Chunks_; + std::vector<std::unique_ptr<TAllocationHolder>> OtherBlocks_; + + void Initialize(size_t startChunkSize); + + char* AllocateUnalignedSlow(size_t size); + char* AllocateAlignedSlow(size_t size, int align); + char* AllocateSlowCore(size_t size); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define CHUNKED_MEMORY_POOL_INL_H_ +#include "chunked_memory_pool-inl.h" +#undef CHUNKED_MEMORY_POOL_INL_H_ diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.cpp b/library/cpp/yt/memory/chunked_memory_pool_output.cpp new file mode 100644 index 0000000000..332096445a --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool_output.cpp @@ -0,0 +1,65 @@ +#include "chunked_memory_pool_output.h" + +#include "chunked_memory_pool.h" + +#include <library/cpp/yt/memory/ref.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryPoolOutput::TChunkedMemoryPoolOutput(TChunkedMemoryPool* pool, size_t chunkSize) + : Pool_(pool) + , ChunkSize_(chunkSize) +{ } + +size_t TChunkedMemoryPoolOutput::DoNext(void** ptr) +{ + // Check if the current chunk is exhausted. + if (Current_ == End_) { + // Emplace the (whole) last chunk, if any. + if (Begin_) { + Refs_.emplace_back(Begin_, Current_); + } + // Allocate a new chunk. + // Use |AllocateAligned| to get a chance to free some memory afterwards. + // Tune the number of bytes requested from the pool to try avoid allocations. + auto spareSize = Pool_->GetCurrentChunkSpareSize(); + auto allocationSize = (spareSize == 0 ? ChunkSize_ : std::min(ChunkSize_, spareSize)); + Begin_ = Pool_->AllocateAligned(allocationSize, /* align */ 1); + Current_ = Begin_; + End_ = Begin_ + allocationSize; + } + + // Return the unused part of the current chunk. + // This could be the whole chunk allocated above. + *ptr = Current_; + auto size = End_ - Current_; + Current_ = End_; + return size; +} + +void TChunkedMemoryPoolOutput::DoUndo(size_t size) +{ + // Just rewind the current pointer. + Current_ -= size; + YT_VERIFY(Current_ >= Begin_); +} + +std::vector<TMutableRef> TChunkedMemoryPoolOutput::Finish() +{ + // Emplace the used part of the last chunk, if any. + if (Begin_) { + Refs_.emplace_back(Begin_, Current_); + } + // Try to free the unused part of the last chunk, if possible. + if (Current_ < End_) { + Pool_->Free(Current_, End_); + } + return std::move(Refs_); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.h b/library/cpp/yt/memory/chunked_memory_pool_output.h new file mode 100644 index 0000000000..774b21788e --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool_output.h @@ -0,0 +1,43 @@ +#pragma once + +#include "public.h" +#include "ref.h" + +#include <util/stream/zerocopy_output.h> + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPoolOutput + : public IZeroCopyOutput +{ +public: + static constexpr size_t DefaultChunkSize = 4_KB; + + explicit TChunkedMemoryPoolOutput( + TChunkedMemoryPool* pool, + size_t chunkSize = DefaultChunkSize); + + std::vector<TMutableRef> Finish(); + +private: + size_t DoNext(void** ptr) override; + void DoUndo(size_t size) override; + +private: + TChunkedMemoryPool* const Pool_; + const size_t ChunkSize_; + + char* Begin_ = nullptr; + char* Current_ = nullptr; + char* End_ = nullptr; + std::vector<TMutableRef> Refs_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h new file mode 100644 index 0000000000..86965457f8 --- /dev/null +++ b/library/cpp/yt/memory/public.h @@ -0,0 +1,15 @@ +#pragma once + +#include "ref_counted.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPool; + +DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp new file mode 100644 index 0000000000..839fe06ce6 --- /dev/null +++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp @@ -0,0 +1,39 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <library/cpp/yt/memory/chunked_memory_pool.h> +#include <library/cpp/yt/memory/chunked_memory_pool_output.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TChunkedMemoryPoolOutputTest, Basic) +{ + constexpr size_t PoolChunkSize = 10; + constexpr size_t PoolOutputChunkSize = 7; + TChunkedMemoryPool pool(NullRefCountedTypeCookie, PoolChunkSize); + TChunkedMemoryPoolOutput output(&pool, PoolOutputChunkSize); + + TString s1("Short."); + output.Write(s1); + + TString s2("Quite a long string."); + output.Write(s2); + + char* buf; + auto len = output.Next(&buf); + output.Undo(len); + + auto chunks = output.Finish(); + TString s; + for (auto chunk : chunks) { + s += TString(chunk.Begin(), chunk.End()); + } + ASSERT_EQ(s1 + s2, s); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp new file mode 100644 index 0000000000..63562cfb96 --- /dev/null +++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp @@ -0,0 +1,74 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <library/cpp/yt/memory/chunked_memory_pool.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TChunkedMemoryPoolTest, Absorb) +{ + TChunkedMemoryPool first; + TChunkedMemoryPool second; + TChunkedMemoryPool third; + + std::vector<std::pair<TStringBuf, TString>> tests; + size_t totalSize = 0; + + auto fillPool = [&] (TChunkedMemoryPool& pool, TString prefix, int count) { + for (int i = 0; i < count; i++) { + TString expected = prefix + ToString(count); + char* buf = pool.AllocateUnaligned(expected.Size()); + ::memcpy(buf, expected.c_str(), expected.size()); + TStringBuf ref(buf, buf + expected.size()); + totalSize += expected.size(); + tests.emplace_back(std::move(ref), std::move(expected)); + } + }; + + auto checkAll = [&] { + ASSERT_GE(first.GetCapacity(), first.GetSize()); + ASSERT_GE(second.GetCapacity(), second.GetSize()); + ASSERT_GE(third.GetCapacity(), third.GetSize()); + ASSERT_EQ(totalSize, first.GetSize() + second.GetSize() + third.GetSize()); + + for (const auto& [ref, expected] : tests) { + ASSERT_EQ(ref, expected); + } + }; + + auto warmup = [&] (TChunkedMemoryPool& pool, int blockSize, int count) { + ASSERT_EQ(pool.GetSize(), 0U); + for (int i = 0; i < count; i++) { + pool.AllocateUnaligned(blockSize); + } + pool.Clear(); + ASSERT_EQ(pool.GetSize(), 0U); + }; + + warmup(first, 20, 20'000); + warmup(second, 20, 20'000); + fillPool(first, "firstPool", 10'000); + fillPool(second, "secondPool", 10'000); + fillPool(third, "thirdPool", 10'000); + checkAll(); + second.Absorb(std::move(third)); + checkAll(); + first.Absorb(std::move(second)); + checkAll(); + fillPool(first, "poolFirst_2", 10'000); + fillPool(second, "poolSecond_2", 10'000); + fillPool(third, "poolThird_2", 10'000); + checkAll(); + third.Absorb(std::move(second)); + checkAll(); + fillPool(second, "someStr2", 10'000); + fillPool(third, "someStr3", 10'000); + checkAll(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT |